Best Python code snippet using autotest_python
tail_stack_events.py
Source:tail_stack_events.py  
1#!/usr/bin/env python2"""Usage:3    tail_stack_events.py [--follow] [--number=<n>] [--depth=<d>] [--max-column-length=<x>] [--profile=<profile>] <stack>4    tail_stack_events.py [--postmortem] [--find-last-failure] [--show-all-failures] [--max-column-length=<x>] [--profile=<profile>] <stack>5Options:6    -f --follow                     Follow the stack events and output new ones as they are received.7    -p <p> --profile=<profile>      The aws profile to use.8    -n <n> --number=<n>             The number of lines to display. [default: 10]9    -d <d> --depth=<d>              The maximum depth to get events for. Use -1 for unlimited depth. [default: 2]10    -m --postmortem                 Find the failures in the last stack update.11    --find-last-failure             Search for the last rollback and show the failures from that update.12                                    By default --postmortem only looks at the latest stack update.13    -x <x> --max-column-length=<x>  The maximum column length for tabular output.14                                    Defaults to 200 for postmortem, 40 otherwise.15    --show-all-failures             Show all failures for the stack update, not just the one that caused the rollback.16    <stack>                         The top-level stack to get events for.17"""18import collections19import logging20import math21import sys22import time23import traceback24import eventlet25eventlet.monkey_patch()26import ansiwrap27import botocore.exceptions28import boto329import colorama30import docopt31import eventlet.greenpool32import tenacity33STACK_TYPE = "AWS::CloudFormation::Stack"34ELLIPSIS = u"\u2026"35LOG = logging.getLogger(__name__)36def retry(func):37    tretry = tenacity.retry(38        wait=(tenacity.wait_random_exponential(multiplier=1, min=0.1, max=10)),39        after=tenacity.after_log(LOG, logging.WARNING),40    )41    def log_exc(*a, **k):42        try:43            return func(*a, **k)44        except Exception:45            LOG.exception("Exception calling %r" % (func,))46            raise47    return tretry(log_exc)48class Column(object):49    def __init__(self, mvl, ml):50        self.max_value_length = mvl51        self.max_length = ml52@retry53def get_stack(stack_name_or_arn):54    cf = boto3.resource("cloudformation")55    stack = cf.Stack(stack_name_or_arn)56    # Switch to the ARN if a stack name was passed in57    if stack.stack_id != stack_name_or_arn:58        stack = cf.Stack(stack.stack_id)59    return stack60# TODO: make the retries per-api-call rather than on this function to reduce repeated API calls61@retry62def get_nested_stacks(stack_name_or_arn, depth=None, status_check=None):63    stack = get_stack(stack_name_or_arn)64    stacks = {stack.stack_id: stack}65    if depth == 0:66        return stacks67    for sub in stack.resource_summaries.all():68        if sub.resource_type != STACK_TYPE:69            continue70        if status_check is not None and status_check(sub.resource_status):71            stacks.update(72                get_nested_stacks(73                    sub.physical_resource_id,74                    depth - 1 if depth is not None else None,75                    status_check,76                )77            )78    return stacks79def get_stack_events(stack, limit=5, _mem={}):80    # TODO: preload _mem with the timestamp of the first event in the parent stack that caused us to add the stack81    pages = stack.events.pages()82    events = next_page(pages)83    if not events:84        if stack.stack_id in _mem:85            del _mem[stack.stack_id]86        return []87    events.sort(key=lambda e: e.timestamp)88    if stack.stack_id in _mem:89        while events[0].timestamp > _mem[stack.stack_id]:90            more = list(next(pages))91            if not more:92                break93            more.sort(key=lambda e: e.timestamp)94            events = more + events95    _mem[stack.stack_id] = events[-1].timestamp96    if stack.stack_id not in _mem:97        return events[-limit:]98    return events99def get_events(stacks, limit=5):100    all_events = []101    pool = eventlet.greenpool.GreenPool(5)102    remove_stacks = set()103    for stack_id, events in zip(104        list(stacks.keys()),105        pool.starmap(get_stack_events, ((s, limit) for s in list(stacks.values()))),106    ):107        if not events:108            remove_stacks.add(stack_id)109            continue110        all_events.extend(events)111    for stack_id in remove_stacks:112        del stacks[stack_id]113    all_events.sort(key=lambda e: e.timestamp)114    return all_events115def update_columns(columns, events):116    for event in events:117        for column in columns.keys():118            columns[column].max_value_length = max(119                columns[column].max_value_length, len(str(getattr(event, column)))120            )121def format_column(column_name, column, value):122    text = str(value)123    if column_name == "resource_status":124        uvalue = value.upper()125        if "FAIL" in uvalue:126            color = colorama.Fore.RED127        elif "ROLLBACK" in uvalue:128            color = colorama.Fore.YELLOW129        elif "IN_PROGRESS" in uvalue:130            color = colorama.Fore.BLUE131        elif uvalue == "DELETE_COMPLETE":132            color = colorama.Fore.LIGHTBLACK_EX133        elif "COMPLETE" in uvalue:134            color = colorama.Fore.GREEN135        else:136            color = colorama.Fore.WHITE137        text = "%s%s%s" % (color, value, colorama.Style.RESET_ALL)138    if ansiwrap.ansilen(text) > column.max_length:139        half_length = (column.max_length - 1) / 2.0140        output_text = "%s%s%s" % (141            text[: int(math.floor(half_length))],142            ELLIPSIS,143            text[-int(math.ceil(half_length)) :],144        )145    else:146        output_text = text147    padding = " " * max(148        0,149        min(column.max_value_length, column.max_length) - ansiwrap.ansilen(output_text),150    )151    return "%s%s" % (padding, output_text)152def output_events(columns, events):153    for e in events:154        fmt = "  ".join("%s" for _ in columns.values())155        print(156            fmt157            % tuple([format_column(n, c, getattr(e, n)) for n, c in columns.items()])158        )159def update_stacks_from_events(stacks, events, main_stack, max_depth=None):160    cf = boto3.resource("cloudformation")161    to_remove = set()162    to_add = set()163    # NOTE: Assuming that events are in proper order here164    for event in events:165        if event.resource_type == STACK_TYPE:166            if event.resource_status.endswith("COMPLETE"):167                # print('Stack %s COMPLETE' % (event.physical_resource_id,))168                if event.physical_resource_id in to_add:169                    to_add.remove(event.physical_resource_id)170                to_remove.add(event.physical_resource_id)171                # if event.physical_resource_id not in stacks:172                #     print('Stack %s COMPLETE but not in stack list' % (event.physical_resource_id,))173            else:174                if event.physical_resource_id in to_remove:175                    to_remove.remove(event.physical_resource_id)176                if not event.physical_resource_id:177                    LOG.debug(178                        "stack event has an empty physical_resource_id: %r", event179                    )180                    continue181                to_add.add(event.physical_resource_id)182                # if event.physical_resource_id not in stacks:183                #     print('Found new substack %s' % (event.physical_resource_id,))184    for stack_id in to_remove:185        if stack_id != main_stack.stack_id and stack_id in stacks:186            if len(stacks) != 1:187                del stacks[stack_id]188            # else:189            #     print('Final stack COMPLETE')190    for stack_id in to_add:191        if stack_id not in stacks and (192            max_depth is None or len(stack_id.split("-")) - 1 <= max_depth193        ):194            stack = cf.Stack(stack_id)195            # Force loading with a retry as it can incur a potentially-failing API call196            retry(lambda: stack.stack_id)()197            # try:198            #     get_stack_events(stack, 1)199            # except botocore.exceptions.ClientError:200            #     continue201            stacks[stack_id] = stack202def do_tail_stack_events(main_stack, num, columns, headers, max_depth, follow):203    stacks = get_nested_stacks(204        main_stack.stack_id, status_check=lambda status: "IN_PROGRESS" in status205    )206    print("Getting events...")207    events = get_events(stacks, limit=num)208    outputted = set(e.id for e in events)209    update_columns(columns, events[-num:])210    update_stacks_from_events(stacks, events, main_stack, max_depth=max_depth)211    output_events(columns, [headers])212    output_events(columns, events[-num:])213    last_event_timestamp = events[-1].timestamp214    if not follow:215        return216    # TODO: Keep track of last updated timestamp for each stack and don't ask for more events217    # if it hasn't changed?218    # This would require updating the stack every time, though, which means adding another API call.219    while True:220        try:221            time.sleep(5)222            events = get_events(stacks)223            new_events = []224            for event in events:225                # Don't re-ouput events and don't output events older than the latest event shown (not doing this means226                # we can get events from the previous stack updates, which we don't want).227                if event.id in outputted or event.timestamp < last_event_timestamp:228                    continue229                new_events.append(event)230            if not new_events:231                continue232            last_event_timestamp = new_events[-1].timestamp233            update_stacks_from_events(stacks, events, main_stack, max_depth=max_depth)234            # TODO: If an event for a stack comes in that isn't in stacks, add it to stacks.235            # TODO: Remove a stack from stacks if there is an "end" event? DELETE_COMPLETE or UPDATE_COMPLETE perhaps?236            #       If adding a stack is implemented this should be fine.237            update_columns(columns, new_events)238            # TODO: Only output headers if the column width has changed or we've output more than X rows since the last239            # header.240            output_events(columns, [headers])241            output_events(columns, new_events)242            for event in new_events:243                # TODO: outputted grows constantly over time, it needs to be culled at some point.244                # Potential fix: outputted is replaced each time we loop with only the ids from the loop.245                outputted.add(event.id)246        except Exception:247            traceback.print_exc()248@retry249def next_page(pages):250    try:251        return list(next(pages))252    except botocore.exceptions.ClientError:253        return None254        # traceback.print_exc()255    except StopIteration:256        return None257def get_stack_failure_events(stack, columns, headers, start_func=None):258    pages = stack.events.pages()259    events = []260    end = False261    ready = start_func is None262    first = True263    while not end:264        page = next_page(pages)265        # update_columns(columns, page)266        # output_events(columns, [headers])267        # output_events(columns, page)268        # print(page)269        # print(stack.stack_id)270        if not page:271            break272        for event in page:273            events.append(event)274            if not ready:275                if start_func(event):276                    ready = True277                continue278            elif (279                event.resource_type == STACK_TYPE280                and event.resource_status.upper().endswith("COMPLETE")281                and event.physical_resource_id == stack.stack_id282                and (283                    not first284                    or event.resource_status.upper() != "UPDATE_ROLLBACK_COMPLETE"285                )286            ):287                end = True288                break289            first = False290    # update_columns(columns, events)291    # output_events(columns, [headers])292    # output_events(columns, events)293    events = sorted(294        (event for event in events if "FAIL" in event.resource_status.upper()),295        key=lambda e: e.timestamp,296    )297    # update_columns(columns, events)298    # output_events(columns, [headers])299    # output_events(columns, events)300    return events301def do_postmortem(302    stack, columns, headers, search_for_failure=False, show_all_failures=False303):304    print("Getting events...")305    start_func = (306        (307            lambda event: (308                event.resource_type == STACK_TYPE309                and event.resource_status.upper() == "UPDATE_ROLLBACK_COMPLETE"310                and event.physical_resource_id == stack.stack_id311            )312        )313        if search_for_failure314        else None315    )316    top_level = True317    events = []318    while True:319        new_events = get_stack_failure_events(320            stack, columns, headers, start_func=start_func321        )322        if not new_events:323            if top_level:324                print(325                    "The last stack update succeeded or there is an ongoing update which has no failures yet."326                )327                sys.exit(1)328            else:329                print("No failure events found in nested stack %r." % (stack,))330                break331        if not show_all_failures:332            new_events = [new_events[0]]333        events.extend(new_events)334        fail_event = new_events[0]335        if (336            fail_event.resource_type != STACK_TYPE337            or "failed to" not in fail_event.resource_status_reason.lower()338        ):339            break340        start_func = lambda event: event.timestamp <= fail_event.timestamp341        stack = get_stack(fail_event.physical_resource_id)342    events.sort(key=lambda e: e.timestamp, reverse=show_all_failures)343    update_columns(columns, events)344    output_events(columns, [headers])345    output_events(columns, events)346def main():347    args = docopt.docopt(__doc__)348    if args["--profile"]:349        boto3.setup_default_session(profile_name=args["--profile"])350    postmortem = args["--postmortem"]351    max_column_length = args["--max-column-length"]352    if max_column_length is None:353        max_column_length = 200 if postmortem else 40354    max_column_length = int(max_column_length)355    columns = collections.OrderedDict(356        [357            ("timestamp", Column(0, max_column_length)),358            ("stack_name", Column(0, max_column_length)),359            # ('stack_id', Column(0, max_column_length)),360            ("resource_type", Column(0, max_column_length)),361            ("logical_resource_id", Column(0, max_column_length)),362            # ('physical_resource_id', Column(0, max_column_length)),363            ("resource_status", Column(0, max_column_length)),364            ("resource_status_reason", Column(0, max_column_length)),365        ]366    )367    headers = collections.namedtuple("Headers", columns.keys())(368        *[369            colorama.Style.BRIGHT + t + colorama.Style.RESET_ALL370            for t in [371                "Timestamp",372                "Stack Name",373                # 'Stack ID',374                "Resource Type",375                "Logical Resource ID",376                # 'Physical Resource ID',377                "Status",378                "Reason",379            ]380        ]381    )382    update_columns(columns, [headers])383    print("Getting stack...")384    main_stack = get_stack(args["<stack>"])385    if postmortem:386        do_postmortem(387            main_stack,388            columns,389            headers,390            search_for_failure=args["--find-last-failure"],391            show_all_failures=args["--show-all-failures"],392        )393    else:394        num = int(args["--number"])395        max_depth = int(args["--depth"])396        if max_depth == -1:397            max_depth = None398        do_tail_stack_events(399            main_stack, num, columns, headers, max_depth, args["--follow"]400        )401if __name__ == "__main__":402    try:403        main()404    except KeyboardInterrupt:...atest.py
Source:atest.py  
...76                action_obj.output(results)77            except Exception:78                traceback.print_exc()79    finally:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
