Best Python code snippet using localstack_python
metric_aggregator.py
Source:metric_aggregator.py  
...25        else:26            output += f"  {template_not_implemented_html}{e}</input><br/>\n"27    output += "  </details></li>\n"28    return output29def create_simple_html_report(file_name, metrics):30    output = "<html><h1> Metric Collection Report of Integration Tests </h1>\n\n"31    output += "<div><b>Disclaimer</b>: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.<br/>\n"32    output += "✨: aws_validated or using the snapshot fixture<br/>\n"33    output += "📸: using the snapshot fixture without any skip_snapshot_verify<br/>\n"34    output += "🚫: using the snapshot fixture but uses skip_snapshot_verify<br/></div>\n"35    for service in sorted(metrics.keys()):36        output += f"<h1> {service} </h1>\n<div>\n"37        details = metrics[service]38        if not details["service_attributes"]["pro"]:39            output += "community<br/>\n"40        elif not details["service_attributes"]["community"]:41            output += "pro only<br/>\n"42        else:43            output += "community, and pro features<br/>\n"44        del metrics[service]["service_attributes"]45        output += "</div>\n"46        operation_counter = len(details)47        operation_tested = 048        tmp = ""49        template_aws_validated = '<span title="AWS validated">✨</span>'50        template_snapshot_verified = '<span title="Snapshot verified">📸</span>'51        template_snapshot_skipped = '<span title="Snapshot skipped">🚫</span>'52        for operation in sorted(details.keys()):53            op_details = details[operation]54            if op_details.get("invoked", 0) > 0:55                operation_tested += 156                aws_validated = f"{template_aws_validated if op_details.get('aws_validated') or op_details.get('snapshot') else ''}"57                snapshot = f"{template_snapshot_verified if aws_validated and not op_details.get('snapshot_skipped_paths') else template_snapshot_skipped if aws_validated else ''}"58                tmp += f"<p>{template_implemented_html}{operation} {aws_validated} {snapshot}</input>\n"59            else:60                tmp += f"<p>{template_not_implemented_html}{operation}</input>\n"61            tmp += "<ul>"62            if op_details.get("parameters"):63                parameters = op_details.get("parameters")64                if parameters:65                    tmp += _generate_details_block_html("parameters  hit", parameters)66            if op_details.get("errors"):67                tmp += _generate_details_block_html("errors hit", op_details["errors"])68            tmp += "</ul>"69            tmp += "</p>"70        output += f"<p><details><summary>{operation_tested/operation_counter*100:.2f}% test coverage</summary>\n\n{tmp}\n</details></p>\n"71        with open(file_name, "a") as fd:72            fd.write(f"{output}\n")73            output = ""74def _generate_details_block(details_title: str, details: dict) -> str:75    output = f"  <details><summary>{details_title}</summary>\n\n"76    for e, count in details.items():77        if count > 0:78            output += f"  {template_implemented_item}{e}\n"79        else:80            output += f"  {template_not_implemented_item}{e}\n"81    output += "  </details>\n"82    return output83def create_readable_report(file_name: str, metrics: dict):84    output = "# Metric Collection Report of Integration Tests #\n\n"85    output += "**__Disclaimer__**: naive calculation of test coverage - if operation is called at least once, it is considered as 'covered'.\n"86    output += f"{AWS_VALIDATED}: aws_validated or using the snapshot fixture\n"87    output += f"{SNAPSHOT}: using the snapshot fixture without any skip_snapshot_verify\n"88    output += f"{AWS_VALIDATED}: using the snapshot fixture but uses skip_snapshot_verify\n"89    for service in sorted(metrics.keys()):90        output += f"## {service} ##\n"91        details = metrics[service]92        if not details["service_attributes"]["pro"]:93            output += "community\n"94        elif not details["service_attributes"]["community"]:95            output += "pro only\n"96        else:97            output += "community, and pro features\n"98        del metrics[service]["service_attributes"]99        operation_counter = len(details)100        operation_tested = 0101        tmp = ""102        for operation in sorted(details.keys()):103            op_details = details[operation]104            if op_details.get("invoked", 0) > 0:105                operation_tested += 1106                aws_validated = f"{AWS_VALIDATED if op_details.get('aws_validated') or op_details.get('snapshot') else ''}"107                snapshot = f"{SNAPSHOT if aws_validated and not op_details.get('snapshot_skipped_paths') else SNAPSHOT_SKIP_VERIFY if aws_validated else ''}"108                tmp += f"{template_implemented_item}{operation} {aws_validated} {snapshot}\n"109            else:110                tmp += f"{template_not_implemented_item}{operation}\n"111            if op_details.get("parameters"):112                parameters = op_details.get("parameters")113                if parameters:114                    tmp += _generate_details_block("parameters  hit", parameters)115            if op_details.get("errors"):116                tmp += _generate_details_block("errors hit", op_details["errors"])117        output += f"<details><summary>{operation_tested/operation_counter*100:.2f}% test coverage</summary>\n\n{tmp}\n</details>\n"118        with open(file_name, "a") as fd:119            fd.write(f"{output}\n")120            output = ""121def _init_service_metric_counter() -> Dict:122    metric_recorder = {}123    from localstack.aws.spec import load_service124    for s, provider in SERVICE_PLUGINS.api_provider_specs.items():125        try:126            service = load_service(s)127            ops = {}128            service_attributes = {"pro": "pro" in provider, "community": "default" in provider}129            ops["service_attributes"] = service_attributes130            for op in service.operation_names:131                attributes = {}132                attributes["invoked"] = 0133                attributes["aws_validated"] = False134                attributes["snapshot"] = False135                if hasattr(service.operation_model(op).input_shape, "members"):136                    params = {}137                    for n in service.operation_model(op).input_shape.members:138                        params[n] = 0139                    attributes["parameters"] = params140                if hasattr(service.operation_model(op), "error_shapes"):141                    exceptions = {}142                    for e in service.operation_model(op).error_shapes:143                        exceptions[e.name] = 0144                    attributes["errors"] = exceptions145                ops[op] = attributes146            metric_recorder[s] = ops147        except Exception:148            LOG.debug(f"cannot load service '{s}'")149    return metric_recorder150def print_usage():151    print("missing argument: directory")152    print("usage: python metric_aggregator.py <dir-to-raw-csv-metric> [amd64|arch64]")153def write_json(file_name: str, metric_dict: dict):154    with open(file_name, "w") as fd:155        fd.write(json.dumps(metric_dict, indent=2, sort_keys=True))156def _print_diff(metric_recorder_internal, metric_recorder_external):157    for key, val in metric_recorder_internal.items():158        for subkey, val in val.items():159            if isinstance(val, dict) and val.get("invoked"):160                if val["invoked"] > 0 and not metric_recorder_external[key][subkey]["invoked"]:161                    print(f"found invocation mismatch: {key}.{subkey}")162def append_row_to_raw_collection(collection_raw_csv_file_name, row, arch):163    with open(collection_raw_csv_file_name, "a") as fd:164        writer = csv.writer(fd)165        row.append(arch)166        writer.writerow(row)167def aggregate_recorded_raw_data(168    base_dir: str, collection_raw_csv: Optional[str] = None, collect_for_arch: Optional[str] = ""169) -> dict:170    pathlist = Path(base_dir).rglob("metric-report-raw-data-*.csv")171    recorded = _init_service_metric_counter()172    for path in pathlist:173        print(f"checking {str(path)}")174        with open(path, "r") as csv_obj:175            csv_dict_reader = csv.reader(csv_obj)176            # skip the header177            next(csv_dict_reader)178            for row in csv_dict_reader:179                if collection_raw_csv:180                    arch = ""181                    if "arm64" in str(path):182                        arch = "arm64"183                    elif "amd64" in str(path):184                        arch = "amd64"185                    # only aggregate all if we did not set a specific target to collect186                    if not collect_for_arch:187                        append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch)188                    elif collect_for_arch in str(path):189                        append_row_to_raw_collection(collection_raw_csv, copy.deepcopy(row), arch)190                metric: Metric = Metric(*row)191                if metric.xfail == "True":192                    print(f"test {metric.node_id} marked as xfail")193                    continue194                if collect_for_arch and collect_for_arch not in str(path):195                    continue196                service = recorded[metric.service]197                ops = service[metric.operation]198                errors = ops.setdefault("errors", {})199                if metric.exception:200                    exception = metric.exception201                    errors[exception] = ops.get(exception, 0) + 1202                elif int(metric.response_code) >= 300:203                    for expected_error in ops.get("errors", {}).keys():204                        if expected_error in metric.response_data:205                            # assume we have a match206                            errors[expected_error] += 1207                            LOG.warning(208                                f"Exception assumed for {metric.service}.{metric.operation}: code {metric.response_code}"209                            )210                            break211                ops["invoked"] += 1212                if metric.snapshot == "True":213                    ops["snapshot"] = True  # TODO snapshot currently includes also "skip_verify"214                    ops["snapshot_skipped_paths"] = metric.snapshot_skipped_paths or ""215                if metric.aws_validated == "True":216                    ops["aws_validated"] = True217                if not metric.parameters:218                    params = ops.setdefault("parameters", {})219                    params["_none_"] = params.get("_none_", 0) + 1220                else:221                    for p in metric.parameters.split(","):222                        ops["parameters"][p] += 1223                test_list = ops.setdefault("tests", [])224                if metric.node_id not in test_list:225                    test_list.append(metric.node_id)226    return recorded227def main():228    if not len(sys.argv) >= 2 or not Path(sys.argv[1]).is_dir():229        print_usage()230        return231    base_dir = sys.argv[1]232    collect_for_arch = ""233    if len(sys.argv) == 3:234        collect_for_arch = sys.argv[2]235        if collect_for_arch not in ("amd64", "arm64"):236            print_usage()237            return238        print(239            f"Set target to '{collect_for_arch}' - will only aggregate for these test results. Raw collection of all files.\n"240        )241    # TODO: removed splitting of internal/external recorded calls, as some pro tests use 'internals' to connect to service242    metrics_path = os.path.join(base_dir, "parity_metrics")243    Path(metrics_path).mkdir(parents=True, exist_ok=True)244    dtime = datetime.datetime.utcnow().strftime("%Y-%m-%d-%H-%M-%s")245    collection_raw_csv = os.path.join(metrics_path, f"raw-collected-data-{dtime}.csv")246    with open(collection_raw_csv, "w") as fd:247        writer = csv.writer(fd)248        header = Metric.RAW_DATA_HEADER.copy()249        header.append("arch")250        writer.writerow(header)251    recorded_metrics = aggregate_recorded_raw_data(base_dir, collection_raw_csv, collect_for_arch)252    write_json(253        os.path.join(254            metrics_path,255            f"metric-report-{dtime}{collect_for_arch}.json",256        ),257        recorded_metrics,258    )259    # filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.md")260    # create_readable_report(filename, recorded_metrics)261    filename = os.path.join(metrics_path, f"metric-report-{dtime}{collect_for_arch}.html")262    create_simple_html_report(filename, recorded_metrics)263if __name__ == "__main__":...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
