How to use check_results method in localstack

Best Python code snippet using localstack_python

app.py

Source:app.py Github

copy

Full Screen

1#!/usr/bin/env python2# Cron-style schedule for when to run checks against PD3# every minute:4CHECK_SCHEDULE = "* * * * *"5#6# every hour at :00:7# CHECK_SCHEDULE = "0 * * * *"8#9# every hour at :00, :15, :30, and :60:10# CHECK_SCHEDULE = "*/15 * * * *"11#12# Check out https://crontab.guru if you need help with cron expressions13# Optional email of a user to assign test incidents to. This user will receive14# low-urgency notifications of test incident whenever the check is run. This15# user must exist or else the 'rest' check will fail16#17# Set it to None if you just want to get the first user alphabetically in the domain.18#19CHECK_USER = None20# Optional ID of a team within which to create Services and Escalation21# Policies (and hence Incidents) - use this with a private team ID to hide22# PDprobe-generated incidents from all non-admin users.23TEAM_ID = None24import os25import time26from datetime import datetime27import requests28import pd29import json30import secrets31import threading32# for making tunnels with external URLs so we can listen for webhooks33from pyngrok import ngrok34# cron scheduler35from apscheduler.schedulers.background import BackgroundScheduler36from apscheduler.triggers.cron import CronTrigger37# webhook listener38from flask import Flask, request39app = Flask(__name__)40def report_results(check_results):41 # This is where you would customize what to do with test results. 42 # You could send to a dashboarding system, or do further processing...43 # 44 # check_results will be a JSON structure that looks like this:45 # {46 # "time_started": <UTC time when the test was started, in ISO8601 format>,47 # "rest": <one of "success", "fail", "not tested">,48 # "events": <one of "success", "fail", "not tested">,49 # "webhooks": <one of "success", "fail", "not tested">,50 # "errors": [an array of strings representing any errors that were encountered while testing],51 # "time_ended": <UTC time when the test was ended, in ISO8601 format>,52 # }53 #54 # Example for a successful test:55 # {56 # "time_started": "2020-03-10T15:11:00Z",57 # "rest": "success",58 # "events": "success",59 # "webhooks": "success",60 # "errors": []61 # }62 #63 # Example for a test where webhooks didn't arrive:64 #65 # {66 # "time_started": "2020-03-12T14:57:00Z",67 # "rest": "success",68 # "events": "success",69 # "webhooks": "fail",70 # "errors": [71 # "Timed out waiting for webhook"72 # ],73 # "time_ended": "2020-03-12T14:57:14Z"74 # }75 #76 print("PD check results:")77 print("----------------------------------")78 print(json.dumps(check_results, indent=4))79 print("----------------------------------")80@app.route('/', methods=['GET', 'POST'])81def index():82 global check_results83 token = os.environ.get('TOKEN')84 # just look for an incident.trigger event85 if request.method == 'POST':86 try:87 content = request.get_json(force=True)88 event = content['messages'][0]['event']89 service_name = content['messages'][0]['incident']['service']['name']90 print(f"Got {event} on service {service_name}")91 if event == "incident.trigger":92 check_results[service_name]['webhooks'] = 'success'93 teardown(service_name, token)94 except Exception as e:95 print(f"oops! {e}")96 pass97 return 'ok'98def create_escalation_policy(token, name):99 """ create an escalation policy in PD """100 users = pd.request(token=token, endpoint="users", params={"limit": 1, "query": CHECK_USER})101 user = users['users'][0]102 body = {103 "escalation_policy": {104 "type": "escalation_policy",105 "name": name,106 "escalation_rules": [107 {108 "escalation_delay_in_minutes": 10,109 "targets": [110 {111 "id": user['id'],112 "type": "user_reference"113 }114 ]115 }116 ],117 "description": "PDprobe transient"118 }119 }120 if TEAM_ID:121 body['teams'] = [122 {123 "type": "team_reference",124 "id": TEAM_ID125 }126 ]127 return pd.request(token=token, endpoint="escalation_policies", method="POST", data=body)128def create_service(token, name, ep_id):129 """ create a service in PD """130 body = {131 "service": {132 "type": "service",133 "name": name,134 "escalation_policy": {135 "id": ep_id,136 "type": "escalation_policy_reference"137 },138 "incident_urgency_rule": {139 "type": "constant",140 "urgency": "low"141 },142 "alert_creation": "create_alerts_and_incidents"143 }144 }145 if TEAM_ID:146 body['teams'] = [147 {148 "type": "team_reference",149 "id": TEAM_ID150 }151 ]152 return pd.request(token=token, endpoint="services", method="POST", data=body)153def create_integration(token, service_id):154 """ create an integration in a PD service """155 body = {156 "type": "events_api_v2_inbound_integration",157 "name": "PDprobe",158 }159 return pd.request(token=token, endpoint=f"services/{service_id}/integrations", method="POST", data=body)160def create_webhook(token, name, service_id, public_url):161 """ create a webhook in a PD service """162 body = {163 "webhook": {164 "type": "webhook_reference",165 "name": name,166 "endpoint_url": public_url,167 "webhook_object": {168 "id": service_id,169 "type": "service_reference"170 },171 "outbound_integration": {172 "id": "PJFWPEP",173 "type": "outbound_integration"174 }175 }176 }177 return pd.request(token=token, endpoint=f"webhooks", method="POST", data=body)178def send_trigger(routing_key, dedup_key):179 """ send a trigger alert """180 payload = {181 "payload": {182 "summary": f"Test {dedup_key}",183 "source": f"{dedup_key}",184 "severity": "critical",185 },186 "routing_key": routing_key,187 "dedup_key": dedup_key,188 "event_action": "trigger"189 }190 return pd.send_v2_event(payload)191def send_resolve(routing_key, dedup_key):192 """ send a resolve alert """193 payload = {194 "payload": {195 "summary": f"Test {dedup_key}",196 "source": f"{dedup_key}",197 "severity": "critical",198 },199 "routing_key": routing_key,200 "dedup_key": dedup_key,201 "event_action": "resolve"202 }203 return pd.send_v2_event(payload)204def destroy_escalation_policy(token, ep_id):205 """ destroy an escalation policy in PD """206 return pd.request(token=token, endpoint=f"escalation_policies/{ep_id}", method="DELETE")207def destroy_service(token, ep_id):208 """ destroy a service in PD """209 return pd.request(token=token, endpoint=f"services/{ep_id}", method="DELETE")210def teardown(name, token):211 global checks212 global check_results213 global timers214 if name in check_results:215 check_results[name]['time_ended'] = datetime.utcnow().replace(microsecond=0).isoformat() + 'Z'216 if check_results[name]['events'] == 'success' and check_results[name]['webhooks'] == 'not tested':217 # we sent an event but didn't get a webhook218 check_results[name]['webhooks'] = 'fail'219 check_results[name]['errors'].append("Timed out waiting for webhook")220 report_results(check_results[name])221 del check_results[name]222 if name in timers and isinstance(timers[name], threading.Timer):223 timers[name].cancel()224 del timers[name]225 if name in checks:226 if 'service_id' in checks[name]:227 print(f"Destroying service {checks[name]['service_id']}")228 destroy_service(token, checks[name]['service_id'])229 if 'ep_id' in checks[name]:230 print(f"Destroying escalation policy {checks[name]['ep_id']}")231 destroy_escalation_policy(token, checks[name]['ep_id'])232 del checks[name]233def check_pd():234 """ check all the PD things """235 # make up a unique name for created objects236 name = f"PDprobe-{secrets.token_hex(32)}"237 token = os.environ.get('TOKEN')238 service_id = None239 ep_id = None240 global checks241 global check_results242 global timers243 check_results[name] = {244 'time_started': datetime.utcnow().replace(microsecond=0).isoformat() + 'Z',245 'rest': 'not tested',246 'events': 'not tested',247 'webhooks': 'not tested',248 'errors': []249 }250 try:251 # create an EP252 print(f"Creating escalation policy {name}")253 ep = create_escalation_policy(token=token, name=name)254 ep_id = ep['escalation_policy']['id']255 print(f"Created EP {ep['escalation_policy']['name']}")256 # create a service257 print(f"Creating service {name}")258 service = create_service(token=token, name=name, ep_id=ep_id)259 service_id = service['service']['id']260 print(f"Created service {service['service']['name']}")261 # add a v2 integration262 print(f"Adding integration")263 integration = create_integration(token=token, service_id=service_id)264 routing_key = integration['integration']['integration_key']265 print(f"Added integration with key {routing_key}")266 # add a webhook267 print(f"Adding webhook {name}")268 webhook = create_webhook(token, name, service_id, public_url)269 print(f"Added webhook {webhook['webhook']['name']}")270 if ep_id and service_id and routing_key:271 check_results[name]['rest'] = 'success'272 checks[name] = {273 "service_id": service_id,274 "ep_id": ep_id275 }276 else:277 check_results[name]['rest'] = 'fail'278 except Exception as e:279 check_results[name]['rest'] = 'fail'280 check_results[name]['errors'].append(str(e))281 try:282 if routing_key:283 # send an event284 print(f"Sending trigger to {routing_key}")285 trigger_response = send_trigger(routing_key=routing_key, dedup_key=name)286 if trigger_response['status'] == 'success' and trigger_response['dedup_key'] == name:287 check_results[name]['events'] = 'success'288 else:289 check_results[name]['events'] = 'fail'290 check_results[name]['errors'].append(str(trigger_response))291 print(f"Sending resolve to {routing_key}")292 resolve_response = send_resolve(routing_key=routing_key, dedup_key=name)293 if resolve_response['status'] == 'success' and resolve_response['dedup_key'] == name:294 pass295 else:296 check_results[name]['events'] = 'fail'297 check_results[name]['errors'].append(str(resolve_response))298 except:299 check_results[name]['events'] = 'fail'300 301 # destroy everything later302 timers[name] = threading.Timer(10.0, teardown, kwargs={"name": name, "token": token})303 timers[name].start()304checks = {}305check_results = {}306timers = {}307# Make a public URL to tunnel to this webhook listener308ngrok.connect(5000)309tunnels = ngrok.get_tunnels()310public_url = tunnels[0].public_url311print(f"Webhook listener public url is {public_url}")312# check PD on a schedule313scheduler = BackgroundScheduler()314scheduler.add_job(check_pd, CronTrigger.from_crontab(CHECK_SCHEDULE))315scheduler.start()316if __name__ == '__main__':...

Full Screen

Full Screen

test_readcgcef.py

Source:test_readcgcef.py Github

copy

Full Screen

...21 return p.communicate()22 def run_read(self, flag, testfile):23 return self.run_cmd(['readcgcef', flag, "--wide",24 os.path.join('files', testfile)])25 def check_results(self, results, expected):26 expected_re = re.compile(expected)27 for line in results:28 if expected_re.search(line):29 return30 # KLUDGE alert. Just throw an error31 self.assertEqual(expected, results)32 def test_good(self):33 results = self.run_read('-d', 'cgc-testcase')34 self.check_results(results, "There is no dynamic section in this file")35 results = self.run_read('-e', 'cgc-testcase')36 self.check_results(results, "LOAD\s+0x000000 0x08048000 0x08048000")37 results = self.run_read('-h', 'cgc-testcase')38 self.check_results(results, "Magic:\s+7f 43 47 43 01 01 01 43 01 4d 65 72 69 6e 6f 00")39 results = self.run_read('-l', 'cgc-testcase')40 self.check_results(results, "LOAD\s+0x000000 0x08048000 0x08048000")41 self.check_results(results, "Entry point 0x8048")42 self.check_results(results,43 "CGCEf file type is EXEC \(Executable file\)")44 results = self.run_read('-n', 'cgc-testcase')45 self.assertEqual(results[0], "")46 results = self.run_read('-r', 'cgc-testcase')47 self.assertEqual(results[0], "")48 results = self.run_read('-s', 'cgc-testcase')49 self.check_results(results, "FUNC\s+GLOBAL\s+[0-9]\s+main")50 self.check_results(results, "FUNC\s+GLOBAL\s+[0-9]\s+_start")51 self.check_results(results, "NOTYPE\s+GLOBAL\s+[0-9]\s+_end")52 results = self.run_read('-t', 'cgc-testcase')53 self.assertEqual(results[0], "")54 results = self.run_read('-D', 'cgc-testcase')55 self.assertEqual(results[0], "")56 results = self.run_read('-S', 'cgc-testcase')57 self.check_results(results, "[[\s*[0-9]+\]\s+\.text\s+PROGBITS\s+[0-9a-f]+")58 self.check_results(results, "[[\s*[0-9]+\]\s+\.shstrtab\s+STRTAB\s+[0-9a-f]+")59 self.check_results(results, "[[\s*[0-9]+\]\s+\.strtab\s+STRTAB\s+[0-9a-f]+")60 self.check_results(results, "[[\s*[0-9]+\]\s+\.symtab\s+SYMTAB\s+[0-9a-f]+")61if __name__ == '__main__':62 unittest.main()...

Full Screen

Full Screen

apk_private.py

Source:apk_private.py Github

copy

Full Screen

1# ! /usr/bin/env python2# -*- coding:utf-8 -*-3'''4Created on 2016-11-85@author: danny.deng6'''7from app.utils.apk import apk_check8def checkapk(apkpath):9 """10 检查apk,返回包体信息和检查结果11 :param apkpath: apk路径12 :return: dict, dict13 """14 check_obj = apk_check.CheckApk(apkpath)15 # 获取Apk基本信息16 rst_data = get_apk_info(check_obj)17 # 获取检查结果18 check_results = get_check_result(check_obj)19 return rst_data, check_results20def get_apk_info(check_obj):21 data = {}22 # 获取Apk包体基本信息23 data['packagename'] = check_obj.apkobj.get_package()24 data['versioncode'] = check_obj.apkobj.get_androidversion_code()25 data['versionname'] = check_obj.apkobj.get_androidversion_name()26 data['appname'] = check_obj.apkobj.get_app_name()27 return data28def get_check_result(check_obj):29 check_results = {}30 # 检查文件31 check_results = dict(check_results, **check_obj.check_apk_include_file())32 # 检查安装33 check_results = dict(check_results, **check_obj.check_installLocation())34 # 检查权限配置35 check_results = dict(check_results, **check_obj.check_manifest_include_arg('uses-permission'))36 # 检查meta-data37 check_results = dict(check_results, **check_obj.check_manifest_include_arg('meta-data'))38 # 检查element属性39 check_results = dict(check_results, **check_obj.check_manifest_include_element())40 # 检查intent-filter配置41 check_results = dict(check_results, **check_obj.check_manifest_include_intent())42 # 检查activity配置43 check_results = dict(check_results, **check_obj.check_manifest_include_arg('activity'))44 # 检查service45 check_results = dict(check_results, **check_obj.check_manifest_include_arg('service'))46 # 检查receiver47 check_results = dict(check_results, **check_obj.check_manifest_include_arg('receiver'))48 # 特殊检查点49 check_results = dict(check_results, **check_obj.check_customized())...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful