Best Python code snippet using lemoncheesecake
test_store_failure_lines.py
Source:test_store_failure_lines.py  
1import json2import pytest3import responses4from django.conf import settings5from requests.exceptions import HTTPError6from treeherder.log_parser.failureline import (7    store_failure_lines,8    write_failure_lines,9    get_group_results,10)11from treeherder.model.models import FailureLine, Group, JobLog, GroupStatus12from ..sampledata import SampleData13def test_store_error_summary(activate_responses, test_repository, test_job):14    log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")15    log_url = 'http://my-log.mozilla.org'16    with open(log_path) as log_handler:17        responses.add(responses.GET, log_url, body=log_handler.read(), status=200)18    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)19    store_failure_lines(log_obj)20    assert FailureLine.objects.count() == 121    failure = FailureLine.objects.get(pk=1)22    assert failure.job_guid == test_job.guid23    assert failure.repository == test_repository24def test_store_error_summary_default_group(activate_responses, test_repository, test_job):25    log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")26    log_url = 'http://my-log.mozilla.org'27    with open(log_path) as log_handler:28        resp_body = json.load(log_handler)29    resp_body["group"] = "default"30    responses.add(responses.GET, log_url, body=json.dumps(resp_body), status=200)31    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)32    store_failure_lines(log_obj)33    assert FailureLine.objects.count() == 134def test_store_error_summary_truncated(activate_responses, test_repository, test_job, monkeypatch):35    log_path = SampleData().get_log_path("plain-chunked_errorsummary_10_lines.log")36    log_url = 'http://my-log.mozilla.org'37    monkeypatch.setattr(settings, 'FAILURE_LINES_CUTOFF', 5)38    with open(log_path) as log_handler:39        responses.add(responses.GET, log_url, body=log_handler.read(), status=200)40    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)41    store_failure_lines(log_obj)42    assert FailureLine.objects.count() == 5 + 143    failure = FailureLine.objects.get(action='truncated')44    assert failure.job_guid == test_job.guid45    assert failure.repository == test_repository46def test_store_error_summary_astral(activate_responses, test_repository, test_job):47    log_path = SampleData().get_log_path("plain-chunked_errorsummary_astral.log")48    log_url = 'http://my-log.mozilla.org'49    with open(log_path, encoding='utf8') as log_handler:50        responses.add(51            responses.GET,52            log_url,53            content_type="text/plain;charset=utf-8",54            body=log_handler.read(),55            status=200,56        )57    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)58    store_failure_lines(log_obj)59    assert FailureLine.objects.count() == 160    failure = FailureLine.objects.get(pk=1)61    assert failure.job_guid == test_job.guid62    assert failure.repository == test_repository63    assert (64        failure.test65        == u"toolkit/content/tests/widgets/test_videocontrols_video_direction.html <U+01F346>"66    )67    assert failure.subtest == u"Test timed out. <U+010081>"68    assert failure.message == u"<U+0F0151>"69    assert failure.stack.endswith("<U+0F0151>")70    assert failure.stackwalk_stdout is None71    assert failure.stackwalk_stderr is None72def test_store_error_summary_404(activate_responses, test_repository, test_job):73    log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")74    log_url = 'http://my-log.mozilla.org'75    with open(log_path) as log_handler:76        responses.add(responses.GET, log_url, body=log_handler.read(), status=404)77    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)78    store_failure_lines(log_obj)79    log_obj.refresh_from_db()80    assert log_obj.status == JobLog.FAILED81def test_store_error_summary_500(activate_responses, test_repository, test_job):82    log_path = SampleData().get_log_path("plain-chunked_errorsummary.log")83    log_url = 'http://my-log.mozilla.org'84    with open(log_path) as log_handler:85        responses.add(responses.GET, log_url, body=log_handler.read(), status=500)86    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)87    with pytest.raises(HTTPError):88        store_failure_lines(log_obj)89    log_obj.refresh_from_db()90    assert log_obj.status == JobLog.FAILED91def test_store_error_summary_duplicate(activate_responses, test_repository, test_job):92    log_url = 'http://my-log.mozilla.org'93    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)94    write_failure_lines(95        log_obj, [{"action": "log", "level": "debug", "message": "test", "line": 1}]96    )97    write_failure_lines(98        log_obj,99        [100            {"action": "log", "level": "debug", "message": "test", "line": 1},101            {"action": "log", "level": "debug", "message": "test 1", "line": 2},102        ],103    )104    assert FailureLine.objects.count() == 2105def test_store_error_summary_group_status(activate_responses, test_repository, test_job):106    log_path = SampleData().get_log_path("mochitest-browser-chrome_errorsummary.log")107    log_url = 'http://my-log.mozilla.org'108    with open(log_path) as log_handler:109        responses.add(responses.GET, log_url, body=log_handler.read(), status=200)110    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)111    store_failure_lines(log_obj)112    assert FailureLine.objects.count() == 5113    ok_groups = Group.objects.filter(group_result__status=GroupStatus.OK)114    error_groups = Group.objects.filter(group_result__status=GroupStatus.ERROR)115    assert ok_groups.count() == 28116    assert error_groups.count() == 1117    assert log_obj.groups.count() == 29118    assert log_obj.groups.all().first().name == "dom/base/test/browser.ini"119    assert ok_groups.first().name == "dom/base/test/browser.ini"120    assert error_groups.first().name == "toolkit/components/pictureinpicture/tests/browser.ini"121def test_get_group_results(activate_responses, test_repository, test_job):122    log_path = SampleData().get_log_path("mochitest-browser-chrome_errorsummary.log")123    log_url = 'http://my-log.mozilla.org'124    with open(log_path) as log_handler:125        responses.add(responses.GET, log_url, body=log_handler.read(), status=200)126    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)127    store_failure_lines(log_obj)128    groups = get_group_results(test_job.push)129    task_groups = groups['V3SVuxO8TFy37En_6HcXLs']130    assert task_groups['dom/base/test/browser.ini']131def test_get_group_results_with_colon(activate_responses, test_repository, test_job):132    log_path = SampleData().get_log_path("xpcshell-errorsummary-with-colon.log")133    log_url = 'http://my-log.mozilla.org'134    with open(log_path) as log_handler:135        responses.add(responses.GET, log_url, body=log_handler.read(), status=200)136    log_obj = JobLog.objects.create(job=test_job, name="errorsummary_json", url=log_url)137    store_failure_lines(log_obj)138    groups = get_group_results(test_job.push)139    task_groups = groups['V3SVuxO8TFy37En_6HcXLs']140    assert task_groups[141        'toolkit/components/extensions/test/xpcshell/xpcshell-e10s.ini:toolkit/components/extensions/test/xpcshell/xpcshell-content.ini'142    ]143    assert task_groups['toolkit/components/places/tests/unit/xpcshell.ini']144    assert task_groups[145        'toolkit/components/extensions/test/xpcshell/xpcshell-e10s.ini:toolkit/components/extensions/test/xpcshell/xpcshell-common-e10s.ini'...com.vmware.cis.tagging.category
Source:com.vmware.cis.tagging.category  
1#!/usr/bin/env python32from pprint import pprint3import re4import requests5import yaml6import dateutil.parser7from datetime import datetime8from datetime import timezone9MAX_HOURS = 1210COMMON_PATTERNS = (11    (12        "MISSING_TAGGING_SERVICE",13        re.compile(r"Cannot find service 'com.vmware.cis.tagging.category'"),14    ),15    (16        "HOST_KAPUT",17        re.compile(18            r"'Unable to communicate with the remote host, since it is disconnected.'"19        ),20    ),21    (22        "HOST_KAPUT",23        re.compile(24            r'Cannot complete login due to an incorrect user name or password."'25        ),26    ),27)28def get_inventory(log_url):29    r = requests.get(log_url + "zuul-info/inventory.yaml")30    return yaml.safe_load(r.text)31def search_error_pattern(log_url):32    founds = {}33    failure_in_re = re.compile(34        ".*NOTICE: To resume at this test target, use the option: --start-at (\S+)"35    )36    r = requests.get(log_url + "job-output.txt")37    task_path = None38    for l in r.text.splitlines():39        if "task path:" in l:40            task_path = l.split(41                "/home/zuul/.ansible/collections/ansible_collections/community/vmware/"42            )[-1]43        for name, pattern in COMMON_PATTERNS:44            if pattern.search(l):45                if name not in founds:46                    founds[name] = 047                    founds["task_path"] = task_path48                founds[name] += 149        m = failure_in_re.search(l)50        if m:51            founds["task_path"] = task_path52            founds["failure_in"] = m.group(1)53    return founds54def iter_builds():55    jobs = [56        "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_1_of_2",57        "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_2_of_2",58        "ansible-test-cloud-integration-vcenter_1esxi_with_nested-python36",59        "ansible-test-cloud-integration-vcenter_2esxi_without_nested-python36",60    ]61    for job in jobs:62        url_template = "https://dashboard.zuul.ansible.com/api/tenant/ansible/builds?job_name={job}&limit=30"63        r = requests.get(url_template.format(job=job))64        builds = r.json()65        for build in builds:66            yield build67results_by_host_id = {}68results_by_region = {}69results_by_age = {}70host_by_host_id = {}71current = {}72for build in iter_builds():73    delta = datetime.now(timezone.utc) - dateutil.parser.parse(build["end_time"] + 'Z')74    age = int(delta.total_seconds() / 3600)75    if age > MAX_HOURS:76        continue77    if build["result"] in ("RETRY_LIMIT", "SKIPPED", "POST_FAILURE"):78        continue79    matches = None80    if build["result"] == "FAILURE":81        matches = search_error_pattern(build["log_url"])82        if "MISSING_TAGGING_SERVICE" not in matches:83#            print("not matches for log_url: {log_url}".format(log_url=build["log_url"]))84#            print(85#                (86#                    "The failure may not be an hypervisor stability problem. We "87#                    "ignore it. ({failure_in} at {task_path})"88#                ).format(89#                    failure_in=matches.get("failure_in"),90#                    task_path=matches.get("task_path"),91#                )92#            )93            continue94        print("** https://github.com/ansible-collections/vmware/pull/{change}".format(95                **build96                ))97        print("Match: {log_url}".format(log_url=build["log_url"]))98    if not build["log_url"]:99        continue100    inventory = get_inventory(build["log_url"])101    cloud = inventory["all"]["hosts"]["esxi1"]["nodepool"]["cloud"]102    region = inventory["all"]["hosts"]["esxi1"]["nodepool"]["az"]103    host_id = inventory["all"]["hosts"]["esxi1"]["nodepool"]["host_id"]104    if age not in results_by_age:105        results_by_age[age] = {"SUCCESS": [], "FAILURE": []}106    results_by_age[age][build["result"]].append(107        {"log_url": build["log_url"], "matches": matches, "region": region}108    )109    if host_id not in results_by_host_id:110        host_by_host_id[host_id] = "{cloud}-{region}-{host_id}".format(111            cloud=cloud, region=region, host_id=host_id112        )113        results_by_host_id[host_id] = {"SUCCESS": [], "FAILURE": []}114    results_by_host_id[host_id][build["result"]].append(115        {"log_url": build["log_url"], "matches": matches}116    )117    if region not in results_by_region:118        results_by_region[region] = {"SUCCESS": [], "FAILURE": []}119    results_by_region[region][build["result"]].append(120        {"log_url": build["log_url"], "matches": matches}121    )122    if build["change"] not in current:123        current[build["change"]] = {124            "build": build,125            "log_url": build["log_url"],126            "matches": matches,127        }128print("BAD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))129for host_id in results_by_host_id.keys():130    if len(results_by_host_id[host_id]["SUCCESS"]) < 2:131        # Not enough metrics132        continue133    rate = len(results_by_host_id[host_id]["SUCCESS"]) / (134        len(results_by_host_id[host_id]["SUCCESS"])135        + len(results_by_host_id[host_id]["FAILURE"])136    )137    if rate < 0.9:138        print(139            "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)140        )141print("GOOD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))142for host_id in results_by_host_id.keys():143    if len(results_by_host_id[host_id]["SUCCESS"]) < 2:144        # Not enough metrics145        continue146    rate = len(results_by_host_id[host_id]["SUCCESS"]) / (147        len(results_by_host_id[host_id]["SUCCESS"])148        + len(results_by_host_id[host_id]["FAILURE"])149    )150    if rate > 0.9:151        print(152            "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)153        )154print("BY ZONE (for last {max_hours})".format(max_hours=MAX_HOURS))155for region, values in results_by_region.items():156    rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))157    print("region={region} (rate={rate})".format(region=region, rate=rate))158print("BY AGE")159for age in sorted(results_by_age):160    values = results_by_age[age]161    rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))162    print(163        "â° {age}h ago (rate={rate})".format(164            age=age, rate=rate, failures=values["FAILURE"]165        )166    )167    for failure in values["FAILURE"]:168        print("    - {failure})".format(age=age, rate=rate, failure=failure))169for job in current.values():170    if not job["matches"]:171        continue172    if "MISSING_TAGGING_SERVICE" in job["matches"]:173        print(174            "Restart: https://github.com/ansible-collections/vmware/pull/{change}".format(175                **job["build"]176            )...esxi_bad_compute_hosts
Source:esxi_bad_compute_hosts  
1#!/usr/bin/env python32from pprint import pprint3import re4import requests5import yaml6import dateutil.parser7from datetime import datetime8from datetime import timezone9MAX_HOURS = 2410COMMON_PATTERNS = (11    (12        "HOST_KAPUT",13        re.compile(r"'An error occurred while communicating with the remote host.'"),14    ),15    (16        "HOST_KAPUT",17        re.compile(18            r"'Unable to communicate with the remote host, since it is disconnected.'"19        ),20    ),21    (22        "HOST_KAPUT",23        re.compile(24            r'Cannot complete login due to an incorrect user name or password."'25        ),26    ),27)28def get_inventory(log_url):29    r = requests.get(log_url + "zuul-info/inventory.yaml")30    return yaml.safe_load(r.text)31def search_error_pattern(log_url):32    founds = {}33    failure_in_re = re.compile(34        ".*NOTICE: To resume at this test target, use the option: --start-at (\S+)"35    )36    r = requests.get(log_url + "job-output.txt")37    task_path = None38    for l in r.text.splitlines():39        if "task path:" in l:40            task_path = l.split(41                "/home/zuul/.ansible/collections/ansible_collections/community/vmware/"42            )[-1]43        for name, pattern in COMMON_PATTERNS:44            if pattern.search(l):45                if name not in founds:46                    founds[name] = 047                    founds["task_path"] = task_path48                founds[name] += 149        m = failure_in_re.search(l)50        if m:51            founds["task_path"] = task_path52            founds["failure_in"] = m.group(1)53    return founds54def iter_builds():55    jobs = [56        "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_1_of_2",57        "ansible-test-cloud-integration-vcenter_1esxi_without_nested-python36_2_of_2",58        "ansible-test-cloud-integration-vcenter_1esxi_with_nested-python36",59        "ansible-test-cloud-integration-vcenter_2esxi_without_nested-python36",60    ]61    for job in jobs:62        url_template = "https://dashboard.zuul.ansible.com/api/tenant/ansible/builds?job_name={job}&limit=30"63        r = requests.get(url_template.format(job=job))64        builds = r.json()65        for build in builds:66            yield build67results_by_host_id = {}68results_by_region = {}69results_by_age = {}70host_by_host_id = {}71current = {}72for build in iter_builds():73    delta = datetime.now(timezone.utc) - dateutil.parser.parse(build["end_time"] + 'Z')74    age = int(delta.total_seconds() / 3600)75    if age > MAX_HOURS:76        continue77    if build["result"] in ("RETRY_LIMIT", "SKIPPED", "POST_FAILURE"):78        continue79    matches = None80    if build["result"] == "FAILURE":81        print("** https://github.com/ansible-collections/vmware/pull/{change}".format(82                **build83                ))84        matches = search_error_pattern(build["log_url"])85        if "HOST_KAPUT" not in matches:86            print("not matches for log_url: {log_url}".format(log_url=build["log_url"]))87            print(88                (89                    "The failure may not be an hypervisor stability problem. We "90                    "ignore it. ({failure_in} at {task_path})"91                ).format(92                    failure_in=matches.get("failure_in"),93                    task_path=matches.get("task_path"),94                )95            )96            continue97        print("Match: {log_url}".format(log_url=build["log_url"]))98    if not build["log_url"]:99        continue100    inventory = get_inventory(build["log_url"])101    cloud = inventory["all"]["hosts"]["esxi1"]["nodepool"]["cloud"]102    region = inventory["all"]["hosts"]["esxi1"]["nodepool"]["az"]103    host_id = inventory["all"]["hosts"]["esxi1"]["nodepool"]["host_id"]104    if age not in results_by_age:105        results_by_age[age] = {"SUCCESS": [], "FAILURE": []}106    results_by_age[age][build["result"]].append(107        {"log_url": build["log_url"], "matches": matches, "region": region}108    )109    if host_id not in results_by_host_id:110        host_by_host_id[host_id] = "{cloud}-{region}-{host_id}".format(111            cloud=cloud, region=region, host_id=host_id112        )113        results_by_host_id[host_id] = {"SUCCESS": [], "FAILURE": []}114    results_by_host_id[host_id][build["result"]].append(115        {"log_url": build["log_url"], "matches": matches}116    )117    if region not in results_by_region:118        results_by_region[region] = {"SUCCESS": [], "FAILURE": []}119    results_by_region[region][build["result"]].append(120        {"log_url": build["log_url"], "matches": matches}121    )122    if build["change"] not in current:123        current[build["change"]] = {124            "build": build,125            "log_url": build["log_url"],126            "matches": matches,127        }128print("BAD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))129for host_id in results_by_host_id.keys():130    if len(results_by_host_id[host_id]["SUCCESS"]) < 2:131        # Not enough metrics132        continue133    rate = len(results_by_host_id[host_id]["SUCCESS"]) / (134        len(results_by_host_id[host_id]["SUCCESS"])135        + len(results_by_host_id[host_id]["FAILURE"])136    )137    if rate < 0.9:138        print(139            "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)140        )141print("GOOD HOSTS (for last {max_hours})".format(max_hours=MAX_HOURS))142for host_id in results_by_host_id.keys():143    if len(results_by_host_id[host_id]["SUCCESS"]) < 2:144        # Not enough metrics145        continue146    rate = len(results_by_host_id[host_id]["SUCCESS"]) / (147        len(results_by_host_id[host_id]["SUCCESS"])148        + len(results_by_host_id[host_id]["FAILURE"])149    )150    if rate > 0.9:151        print(152            "name={name} (rate={rate})".format(name=host_by_host_id[host_id], rate=rate)153        )154print("BY ZONE (for last {max_hours})".format(max_hours=MAX_HOURS))155for region, values in results_by_region.items():156    rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))157    print("region={region} (rate={rate})".format(region=region, rate=rate))158print("BY AGE")159for age in sorted(results_by_age):160    values = results_by_age[age]161    rate = len(values["SUCCESS"]) / (len(values["SUCCESS"]) + len(values["FAILURE"]))162    print(163        "â° {age}h ago (rate={rate})".format(164            age=age, rate=rate, failures=values["FAILURE"]165        )166    )167    for failure in values["FAILURE"]:168        print("    - {failure})".format(age=age, rate=rate, failure=failure))169for job in current.values():170    if not job["matches"]:171        continue172    if "HOST_KAPUT" in job["matches"]:173        print(174            "Restart: https://github.com/ansible-collections/vmware/pull/{change}".format(175                **job["build"]176            )...slack_callback_functions_with_partial.py
Source:slack_callback_functions_with_partial.py  
1from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator2from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook3from airflow.hooks.base import BaseHook4from airflow.operators.python import get_current_context5import traceback6"""7Follow Option #2 outlined here https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd1551058in order to set up Slack HTTP webhook9"""10def dag_triggered_callback(context, **kwargs):11    slack_conn_id = kwargs["http_conn_id"]12    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password13    log_url = context.get("task_instance").log_url14    slack_msg = f"""15            :airflow-new: DAG has been triggered. 16            *Task*: {context.get('task_instance').task_id}  17            *DAG*: {context.get('task_instance').dag_id} 18            *Execution Time*: {context.get('execution_date')}  19            <{log_url}| *Log URL*>20            """21    slack_alert = SlackWebhookOperator(22        task_id="slack_test",23        http_conn_id=slack_conn_id,24        webhook_token=slack_webhook_token,25        message=slack_msg,26        username="airflow",27    )28    return slack_alert.execute(context=context)29def dag_success_callback(context, **kwargs):30    slack_conn_id = kwargs["http_conn_id"]31    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password32    log_url = context.get("task_instance").log_url33    slack_msg = f"""34            :airflow-new: DAG has succeeded. 35            *Task*: {context.get('task_instance').task_id}  36            *DAG*: {context.get('task_instance').dag_id} 37            *Execution Time*: {context.get('execution_date')}  38            <{log_url}| *Log URL*>39            """40    slack_alert = SlackWebhookOperator(41        task_id="slack_test",42        http_conn_id=slack_conn_id,43        webhook_token=slack_webhook_token,44        message=slack_msg,45        username="airflow",46    )47    return slack_alert.execute(context=context)48def success_callback(context, **kwargs):49    slack_conn_id = kwargs["http_conn_id"]50    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password51    log_url = context.get("task_instance").log_url52    slack_msg = f"""53            :white_check_mark: Task has succeeded. 54            *Task*: {context.get('task_instance').task_id}  55            *DAG*: {context.get('task_instance').dag_id} 56            *Execution Time*: {context.get('execution_date')}  57            <{log_url}| *Log URL*>58            """59    slack_alert = SlackWebhookOperator(60        task_id="slack_test",61        http_conn_id=slack_conn_id,62        webhook_token=slack_webhook_token,63        message=slack_msg,64        username="airflow",65    )66    return slack_alert.execute(context=context)67def failure_callback(context, **kwargs):68    slack_conn_id = kwargs["http_conn_id"]69    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password70    log_url = context.get("task_instance").log_url71    exception = context.get('exception')72    formatted_exception = ''.join(73        traceback.format_exception(etype=type(exception),74                                   value=exception,75                                   tb=exception.__traceback__)76    ).strip()77    slack_msg = f"""78            :x: Task has failed. 79            *Task*: {context.get('task_instance').task_id}80            *DAG*: {context.get('task_instance').dag_id} 81            *Execution Time*: {context.get('execution_date')}  82            *Exception*: {formatted_exception}83            <{log_url}| *Log URL*>84            """85    slack_alert = SlackWebhookOperator(86        task_id="slack_test",87        http_conn_id=slack_conn_id,88        webhook_token=slack_webhook_token,89        message=slack_msg,90        username="airflow",91    )92    return slack_alert.execute(context=context)93def retry_callback(context, **kwargs):94    slack_conn_id = kwargs["http_conn_id"]95    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password96    log_url = context.get("task_instance").log_url97    exception = context.get('exception')98    formatted_exception = ''.join(99        traceback.format_exception(etype=type(exception),100                                   value=exception,101                                   tb=exception.__traceback__)102    ).strip()103    slack_msg = f"""104            :sos: Task is retrying.105            *Task*: {context.get('task_instance').task_id}106            *Try number:* {context.get('task_instance').try_number - 1} out of {context.get('task_instance').max_tries + 1}. 107            *DAG*: {context.get('task_instance').dag_id}108            *Execution Time*: {context.get('execution_date')}109            *Exception*: {formatted_exception}110            <{log_url}| *Log URL*>111            """112    slack_alert = SlackWebhookOperator(113        task_id="slack_test",114        http_conn_id=slack_conn_id,115        webhook_token=slack_webhook_token,116        message=slack_msg,117        username="airflow",118    )119    return slack_alert.execute(context=context)120def slack_test(**kwargs):121    context = get_current_context()122    slack_conn_id = kwargs["http_conn_id"]123    slack_webhook_token = BaseHook.get_connection(slack_conn_id).password124    log_url = context.get("task_instance").log_url125    slack_msg = f"""126            :airflow-spin-new: This is a test for sending a slack message via a PythonOperator.127            *Task*: {context.get('task_instance').task_id}128            *DAG*: {context.get('task_instance').dag_id}129            *Execution Time*: {context.get('execution_date')}130            <{log_url}| *Log URL*>131            """132    slack_alert = SlackWebhookOperator(133        task_id="slack_test",134        http_conn_id=slack_conn_id,135        webhook_token=slack_webhook_token,136        message=slack_msg,137        username="airflow",138    )139    return slack_alert.execute(context=context)140def sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis, *args, **kwargs):141    dag_id = slas[0].dag_id142    task_id = slas[0].task_id143    execution_date = slas[0].execution_date.isoformat()144    http_conn_id = kwargs["http_conn_id"]145    hook = SlackWebhookHook(146        http_conn_id=http_conn_id,147        webhook_token=BaseHook.get_connection(http_conn_id).password,148        message=f"""149            :sos: *SLA has been missed*150            *Task:* {task_id}151            *DAG:* {dag_id}152            *Execution Date:* {execution_date}153            """,154    )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
