Best Python code snippet using localstack_python
app.py
Source:app.py  
1#!/usr/bin/env python2# Cron-style schedule for when to run checks against PD3# every minute:4CHECK_SCHEDULE = "* * * * *"5#6# every hour at :00:7# CHECK_SCHEDULE = "0 * * * *"8#9# every hour at :00, :15, :30, and :60:10# CHECK_SCHEDULE = "*/15 * * * *"11#12# Check out https://crontab.guru if you need help with cron expressions13# Optional email of a user to assign test incidents to. This user will receive14# low-urgency notifications of test incident whenever the check is run. This15# user must exist or else the 'rest' check will fail16#17# Set it to None if you just want to get the first user alphabetically in the domain.18#19CHECK_USER = None20# Optional ID of a team within which to create Services and Escalation21# Policies (and hence Incidents) - use this with a private team ID to hide22# PDprobe-generated incidents from all non-admin users.23TEAM_ID = None24import os25import time26from datetime import datetime27import requests28import pd29import json30import secrets31import threading32# for making tunnels with external URLs so we can listen for webhooks33from pyngrok import ngrok34# cron scheduler35from apscheduler.schedulers.background import BackgroundScheduler36from apscheduler.triggers.cron import CronTrigger37# webhook listener38from flask import Flask, request39app = Flask(__name__)40def report_results(check_results):41    # This is where you would customize what to do with test results. 42    # You could send to a dashboarding system, or do further processing...43    # 44    # check_results will be a JSON structure that looks like this:45    # {46    #     "time_started": <UTC time when the test was started, in ISO8601 format>,47    #     "rest": <one of "success", "fail", "not tested">,48    #     "events": <one of "success", "fail", "not tested">,49    #     "webhooks": <one of "success", "fail", "not tested">,50    #     "errors": [an array of strings representing any errors that were encountered while testing],51    #     "time_ended": <UTC time when the test was ended, in ISO8601 format>,52    # }53    #54    # Example for a successful test:55    # {56    #     "time_started": "2020-03-10T15:11:00Z",57    #     "rest": "success",58    #     "events": "success",59    #     "webhooks": "success",60    #     "errors": []61    # }62    #63    # Example for a test where webhooks didn't arrive:64    #65    # {66    #     "time_started": "2020-03-12T14:57:00Z",67    #     "rest": "success",68    #     "events": "success",69    #     "webhooks": "fail",70    #     "errors": [71    #         "Timed out waiting for webhook"72    #     ],73    #     "time_ended": "2020-03-12T14:57:14Z"74    # }75    #76    print("PD check results:")77    print("----------------------------------")78    print(json.dumps(check_results, indent=4))79    print("----------------------------------")80@app.route('/', methods=['GET', 'POST'])81def index():82    global check_results83    token = os.environ.get('TOKEN')84    # just look for an incident.trigger event85    if request.method == 'POST':86        try:87            content = request.get_json(force=True)88            event = content['messages'][0]['event']89            service_name = content['messages'][0]['incident']['service']['name']90            print(f"Got {event} on service {service_name}")91            if event == "incident.trigger":92                check_results[service_name]['webhooks'] = 'success'93                teardown(service_name, token)94        except Exception as e:95            print(f"oops! {e}")96            pass97    return 'ok'98def create_escalation_policy(token, name):99    """ create an escalation policy in PD """100    users = pd.request(token=token, endpoint="users", params={"limit": 1, "query": CHECK_USER})101    user = users['users'][0]102    body = {103        "escalation_policy": {104            "type": "escalation_policy",105            "name": name,106            "escalation_rules": [107                {108                    "escalation_delay_in_minutes": 10,109                    "targets": [110                        {111                            "id": user['id'],112                            "type": "user_reference"113                        }114                    ]115                }116            ],117            "description": "PDprobe transient"118        }119    }120    if TEAM_ID:121        body['teams'] = [122            {123                "type": "team_reference",124                "id": TEAM_ID125            }126        ]127    return pd.request(token=token, endpoint="escalation_policies", method="POST", data=body)128def create_service(token, name, ep_id):129    """ create a service in PD """130    body = {131        "service": {132            "type": "service",133            "name": name,134            "escalation_policy": {135                "id": ep_id,136                "type": "escalation_policy_reference"137            },138            "incident_urgency_rule": {139                "type": "constant",140                "urgency": "low"141            },142            "alert_creation": "create_alerts_and_incidents"143        }144    }145    if TEAM_ID:146        body['teams'] = [147            {148                "type": "team_reference",149                "id": TEAM_ID150            }151        ]152    return pd.request(token=token, endpoint="services", method="POST", data=body)153def create_integration(token, service_id):154    """ create an integration in a PD service """155    body = {156        "type": "events_api_v2_inbound_integration",157        "name": "PDprobe",158    }159    return pd.request(token=token, endpoint=f"services/{service_id}/integrations", method="POST", data=body)160def create_webhook(token, name, service_id, public_url):161    """ create a webhook in a PD service """162    body = {163        "webhook": {164            "type": "webhook_reference",165            "name": name,166            "endpoint_url": public_url,167            "webhook_object": {168                "id": service_id,169                "type": "service_reference"170            },171            "outbound_integration": {172                "id": "PJFWPEP",173                "type": "outbound_integration"174            }175        }176    }177    return pd.request(token=token, endpoint=f"webhooks", method="POST", data=body)178def send_trigger(routing_key, dedup_key):179    """ send a trigger alert """180    payload = {181        "payload": {182            "summary": f"Test {dedup_key}",183            "source": f"{dedup_key}",184            "severity": "critical",185        },186        "routing_key": routing_key,187        "dedup_key": dedup_key,188        "event_action": "trigger"189    }190    return pd.send_v2_event(payload)191def send_resolve(routing_key, dedup_key):192    """ send a resolve alert """193    payload = {194        "payload": {195            "summary": f"Test {dedup_key}",196            "source": f"{dedup_key}",197            "severity": "critical",198        },199        "routing_key": routing_key,200        "dedup_key": dedup_key,201        "event_action": "resolve"202    }203    return pd.send_v2_event(payload)204def destroy_escalation_policy(token, ep_id):205    """ destroy an escalation policy in PD """206    return pd.request(token=token, endpoint=f"escalation_policies/{ep_id}", method="DELETE")207def destroy_service(token, ep_id):208    """ destroy a service in PD """209    return pd.request(token=token, endpoint=f"services/{ep_id}", method="DELETE")210def teardown(name, token):211    global checks212    global check_results213    global timers214    if name in check_results:215        check_results[name]['time_ended'] = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'216        if check_results[name]['events'] == 'success' and check_results[name]['webhooks'] == 'not tested':217            # we sent an event but didn't get a webhook218            check_results[name]['webhooks'] = 'fail'219            check_results[name]['errors'].append("Timed out waiting for webhook")220        report_results(check_results[name])221        del check_results[name]222    if name in timers and isinstance(timers[name], threading.Timer):223        timers[name].cancel()224        del timers[name]225    if name in checks:226        if 'service_id' in checks[name]:227            print(f"Destroying service {checks[name]['service_id']}")228            destroy_service(token, checks[name]['service_id'])229        if 'ep_id' in checks[name]:230            print(f"Destroying escalation policy {checks[name]['ep_id']}")231            destroy_escalation_policy(token, checks[name]['ep_id'])232        del checks[name]233def check_pd():234    """ check all the PD things """235    # make up a unique name for created objects236    name = f"PDprobe-{secrets.token_hex(32)}"237    token = os.environ.get('TOKEN')238    service_id = None239    ep_id = None240    global checks241    global check_results242    global timers243    check_results[name] = {244        'time_started': datetime.utcnow().replace(microsecond=0).isoformat() + 'Z',245        'rest': 'not tested',246        'events': 'not tested',247        'webhooks': 'not tested',248        'errors': []249    }250    try:251        # create an EP252        print(f"Creating escalation policy {name}")253        ep = create_escalation_policy(token=token, name=name)254        ep_id = ep['escalation_policy']['id']255        print(f"Created EP {ep['escalation_policy']['name']}")256        # create a service257        print(f"Creating service {name}")258        service = create_service(token=token, name=name, ep_id=ep_id)259        service_id = service['service']['id']260        print(f"Created service {service['service']['name']}")261        # add a v2 integration262        print(f"Adding integration")263        integration = create_integration(token=token, service_id=service_id)264        routing_key = integration['integration']['integration_key']265        print(f"Added integration with key {routing_key}")266        # add a webhook267        print(f"Adding webhook {name}")268        webhook = create_webhook(token, name, service_id, public_url)269        print(f"Added webhook {webhook['webhook']['name']}")270        if ep_id and service_id and routing_key:271            check_results[name]['rest'] = 'success'272            checks[name] = {273                "service_id": service_id,274                "ep_id": ep_id275            }276        else:277            check_results[name]['rest'] = 'fail'278    except Exception as e:279        check_results[name]['rest'] = 'fail'280        check_results[name]['errors'].append(str(e))281    try:282        if routing_key:283            # send an event284            print(f"Sending trigger to {routing_key}")285            trigger_response = send_trigger(routing_key=routing_key, dedup_key=name)286            if trigger_response['status'] == 'success' and trigger_response['dedup_key'] == name:287                check_results[name]['events'] = 'success'288            else:289                check_results[name]['events'] = 'fail'290                check_results[name]['errors'].append(str(trigger_response))291            print(f"Sending resolve to {routing_key}")292            resolve_response = send_resolve(routing_key=routing_key, dedup_key=name)293            if resolve_response['status'] == 'success' and resolve_response['dedup_key'] == name:294                pass295            else:296                check_results[name]['events'] = 'fail'297                check_results[name]['errors'].append(str(resolve_response))298    except:299        check_results[name]['events'] = 'fail'300    301    # destroy everything later302    timers[name] = threading.Timer(10.0, teardown, kwargs={"name": name, "token": token})303    timers[name].start()304checks = {}305check_results = {}306timers = {}307# Make a public URL to tunnel to this webhook listener308ngrok.connect(5000)309tunnels = ngrok.get_tunnels()310public_url = tunnels[0].public_url311print(f"Webhook listener public url is {public_url}")312# check PD on a schedule313scheduler = BackgroundScheduler()314scheduler.add_job(check_pd, CronTrigger.from_crontab(CHECK_SCHEDULE))315scheduler.start()316if __name__ == '__main__':...test_readcgcef.py
Source:test_readcgcef.py  
...21        return p.communicate()22    def run_read(self, flag, testfile):23        return self.run_cmd(['readcgcef', flag, "--wide",24                             os.path.join('files', testfile)])25    def check_results(self, results, expected):26        expected_re = re.compile(expected)27        for line in results:28            if expected_re.search(line):29                return30        # KLUDGE alert. Just throw an error31        self.assertEqual(expected, results)32    def test_good(self):33        results = self.run_read('-d', 'cgc-testcase')34        self.check_results(results, "There is no dynamic section in this file")35        results = self.run_read('-e', 'cgc-testcase')36        self.check_results(results, "LOAD\s+0x000000 0x08048000 0x08048000")37        results = self.run_read('-h', 'cgc-testcase')38        self.check_results(results, "Magic:\s+7f 43 47 43 01 01 01 43 01 4d 65 72 69 6e 6f 00")39        results = self.run_read('-l', 'cgc-testcase')40        self.check_results(results, "LOAD\s+0x000000 0x08048000 0x08048000")41        self.check_results(results, "Entry point 0x8048")42        self.check_results(results,43                           "CGCEf file type is EXEC \(Executable file\)")44        results = self.run_read('-n', 'cgc-testcase')45        self.assertEqual(results[0], "")46        results = self.run_read('-r', 'cgc-testcase')47        self.assertEqual(results[0], "")48        results = self.run_read('-s', 'cgc-testcase')49        self.check_results(results, "FUNC\s+GLOBAL\s+[0-9]\s+main")50        self.check_results(results, "FUNC\s+GLOBAL\s+[0-9]\s+_start")51        self.check_results(results, "NOTYPE\s+GLOBAL\s+[0-9]\s+_end")52        results = self.run_read('-t', 'cgc-testcase')53        self.assertEqual(results[0], "")54        results = self.run_read('-D', 'cgc-testcase')55        self.assertEqual(results[0], "")56        results = self.run_read('-S', 'cgc-testcase')57        self.check_results(results, "[[\s*[0-9]+\]\s+\.text\s+PROGBITS\s+[0-9a-f]+")58        self.check_results(results, "[[\s*[0-9]+\]\s+\.shstrtab\s+STRTAB\s+[0-9a-f]+")59        self.check_results(results, "[[\s*[0-9]+\]\s+\.strtab\s+STRTAB\s+[0-9a-f]+")60        self.check_results(results, "[[\s*[0-9]+\]\s+\.symtab\s+SYMTAB\s+[0-9a-f]+")61if __name__ == '__main__':62    unittest.main()...apk_private.py
Source:apk_private.py  
1# ! /usr/bin/env python2# -*- coding:utf-8 -*-3'''4Created on 2016-11-85@author: danny.deng6'''7from app.utils.apk import apk_check8def checkapk(apkpath):9    """10    æ£æ¥apkï¼è¿åå
ä½ä¿¡æ¯åæ£æ¥ç»æ11    :param apkpath: apkè·¯å¾12    :return: dictï¼ dict13    """14    check_obj = apk_check.CheckApk(apkpath)15    # è·åApkåºæ¬ä¿¡æ¯16    rst_data = get_apk_info(check_obj)17    # è·åæ£æ¥ç»æ18    check_results = get_check_result(check_obj)19    return rst_data, check_results20def get_apk_info(check_obj):21    data = {}22    # è·åApkå
ä½åºæ¬ä¿¡æ¯23    data['packagename'] = check_obj.apkobj.get_package()24    data['versioncode'] = check_obj.apkobj.get_androidversion_code()25    data['versionname'] = check_obj.apkobj.get_androidversion_name()26    data['appname'] = check_obj.apkobj.get_app_name()27    return data28def get_check_result(check_obj):29    check_results = {}30    # æ£æ¥æä»¶31    check_results = dict(check_results, **check_obj.check_apk_include_file())32    # æ£æ¥å®è£
33    check_results = dict(check_results, **check_obj.check_installLocation())34    # æ£æ¥æéé
ç½®35    check_results = dict(check_results, **check_obj.check_manifest_include_arg('uses-permission'))36    # æ£æ¥meta-data37    check_results = dict(check_results, **check_obj.check_manifest_include_arg('meta-data'))38    # æ£æ¥element屿§39    check_results = dict(check_results, **check_obj.check_manifest_include_element())40    # æ£æ¥intent-filteré
ç½®41    check_results = dict(check_results, **check_obj.check_manifest_include_intent())42    # æ£æ¥activityé
ç½®43    check_results = dict(check_results, **check_obj.check_manifest_include_arg('activity'))44    # æ£æ¥service45    check_results = dict(check_results, **check_obj.check_manifest_include_arg('service'))46    # æ£æ¥receiver47    check_results = dict(check_results, **check_obj.check_manifest_include_arg('receiver'))48    # ç¹æ®æ£æ¥ç¹49    check_results = dict(check_results, **check_obj.check_customized())...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
