Best Python code snippet using ATX
server
Source:server  
1#!/usr/bin/env python32from sanic import Sanic3from sanic.response import html, redirect, text, raw4from sanic.exceptions import NotFound5import sys6import asyncio7import ssl8from pathlib import Path9# rookuu's imports10from base64 import b64decode, b64encode11from time import sleep12import requests13import json14import os15import subprocess16import plistlib17import random18import string19import threading20config = {}21agents = {}22mdm_host = None23lock = None24async def print_flush(message):25    print(message)26    sys.stdout.flush()27def server_error_handler(request, exception):28    if request is None:29        print("Invalid HTTP Method - Likely HTTPS trying to talk to HTTP")30        sys.stdout.flush()31        return html("Error: Failed to process request", status=500, headers={})32    return html("Error: Requested URL {} not found".format(request.url), status=404, headers=config[request.app.name]['headers'])33async def agent_message(request, **kwargs):34    global config35    global agents36    global mdm_host37    global lock38    try:39        if config[request.app.name]['debug']:40            await print_flush("agent_message request from: {} with {} and {}".format(request.url, request.cookies, request.headers))41            await print_flush(request.json)42        if request.method == "POST":43            if request.json['topic'] == 'mdm.Authenticate':44                # First time we've seen an agent.45                payload = plistlib.loads(b64decode(request.json['checkin_event']['raw_payload']))46                await print_flush(payload)47                # Issue: We need to keep track of our payloads. A specific callback, needs to be tied to a specific payload. This is not natively possible with mdm, as we don't control the data being sent. 48                # Fix: Embed some data in an installed profile then immediately query to see if that profile exists!49                udid = payload['UDID']50                if udid not in agents:51                    agents[udid] = {}52                agents[udid]['os'] = payload['OSVersion']  53                agents[udid]['host'] = payload['DeviceName']54                agents[udid]['tasks'] = {}   55                with lock:56                    with open("agents.json", "w") as f:57                        json.dump(agents, f)                 58                # Query profile59                await asyncio.sleep(1)60                61                command = {62                    "udid": udid,63                    "request_type": "ProfileList"64                }65                r = requests.post("https://localhost/v1/commands", auth=('micromdm', 'mythic'), json=command, verify=False)66            elif request.json['topic'] == 'mdm.TokenUpdate':67                payload = plistlib.loads(b64decode(request.json['checkin_event']['raw_payload']))68            elif request.json['topic'] == 'mdm.Connect':69                # Agent is returning data for some command.70                payload = plistlib.loads(b64decode(request.json['acknowledge_event']['raw_payload']))71                await print_flush(payload)72                if payload['Status'] == 'Acknowledged':73                    if "ProfileList" in payload:74                        for profile in payload['ProfileList']:75                            for payload_content in profile['PayloadContent']:76                                if payload_content['PayloadType'] == "com.apple.mdm":77                                    payload_uuid = payload_content['PayloadDisplayName']78                                    await print_flush("Found payload ID: {}".format(payload_uuid))79                                    break80                        udid = payload['UDID']81                        if "callback_uuid" not in agents[udid]:82                            # Check if we've already seen that payload ID, if not then we need to do a first checkin.83                            agents[udid]['payload_uuid'] = payload_uuid84                            checkin_payload = {85                                "action": "checkin", 86                                "ip": "127.0.0.1", 87                                "os": agents[udid]['os'], 88                                "user": "mdmclient", 89                                "host": agents[udid]['host'], 90                                "pid": 0, 91                                "uuid": payload_uuid,92                            }93                            94                            checkin = b64encode((payload_uuid + json.dumps(checkin_payload)).encode())95                            mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)96                            decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])97                            agents[udid]['callback_uuid'] = decoded_mythic_response['id']98                            with lock:99                                with open("agents.json", "w") as f:100                                    json.dump(agents, f)  101                            await print_flush("Connect back from payload, assigned {}".format(decoded_mythic_response['id']))102                        else:103                            # Handle an actual tasking of ProfileList104                            task_uuid = agents[udid]['tasks'][payload['CommandUUID']]105                            callback_uuid = agents[udid]['callback_uuid']106                            post_response_payload = {107                                "action": "post_response",108                                "responses": [109                                    {110                                        "task_id": task_uuid,111                                        "user_output": json.dumps(payload, default=str, indent=4, sort_keys=True),112                                        "completed": True113                                    }114                                ]115                            }116                            checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(post_response_payload)).encode())117                            mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)118                            decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])119                            await print_flush(decoded_mythic_response)120                            del agents[udid]['tasks'][payload['CommandUUID']]121                    else:122                        # Every other command comes through here.123                        # We need to take the response, map back to the mythic task uuids, and build a post_response request back to mythic.124                        await print_flush(payload)125                        udid = payload['UDID']126                        if payload['CommandUUID'] not in agents[udid]['tasks']:127                            await print_flush("ERROR: This command wasn't linked to a task launched from Mythic. Not reporting output.")128                            return raw('', status=200)129                        task_uuid = agents[udid]['tasks'][payload['CommandUUID']]130                        callback_uuid = agents[udid]['callback_uuid']131                        post_response_payload = {132                            "action": "post_response",133                            "responses": [134                                {135                                    "task_id": task_uuid,136                                    "user_output": json.dumps(payload, default=str, indent=4, sort_keys=True),137                                    "completed": True138                                }139                            ]140                        }141                        checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(post_response_payload)).encode())142                        mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)143                        decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])144                        await print_flush(decoded_mythic_response)145                        del agents[udid]['tasks'][payload['CommandUUID']]146                        with lock:147                            with open("agents.json", "w") as f:148                                json.dump(agents, f)  149                elif payload['Status'] == "Idle":150                    payload = plistlib.loads(b64decode(request.json['acknowledge_event']['raw_payload']))151                    udid = payload['UDID']152                    get_tasking_payload = {153                        "action": "get_tasking", 154                        "tasking_size": -1155                    }156                    checkin = b64encode((agents[udid]['callback_uuid'] + json.dumps(get_tasking_payload)).encode())157                    mythic_response = requests.post(config['mythic_address'], data=checkin, headers={"Mythic": "mdm"}, verify=False)158                    decoded_mythic_response = json.loads(b64decode(mythic_response.text)[36:])159                    await print_flush(decoded_mythic_response)160                    161                    for task in decoded_mythic_response['tasks']:162                        if task['command'] == "device_information":163                            task_json = {164                                "udid": udid,165                                "request_type": "DeviceInformation",166                                "queries": []167                            }168                        elif task['command'] == 'certificate_list':169                            task_json = {170                                "udid": udid,171                                "request_type": "CertificateList"172                            }173                            174                        elif task['command'] == 'installed_application_list':175                            task_json = {176                                "udid": udid,177                                "request_type": "InstalledApplicationList"178                            }179                            180                        elif task['command'] == 'profile_list':181                            task_json = {182                                "udid": udid,183                                "request_type": "ProfileList"184                            }185                            186                        elif task['command'] == 'provisioning_profile_list':187                            task_json = {188                                "udid": udid,189                                "request_type": "ProvisioningProfileList"190                            }191                            192                        elif task['command'] == 'security_info':193                            task_json = {194                                "udid": udid,195                                "request_type": "SecurityInfo"196                            }197                            198                        elif task['command'] == 'install_profile':199                            print(task)200                            task_json = {201                                "udid": udid,202                                "request_type": "InstallProfile",203                                "payload": json.loads(task['parameters'])['file']204                            }205                        elif task['command'] == 'install_pkg':206                            # First, take the signed pkg file and move it to /pkg207                            signed_pkg = json.loads(task['parameters'])['file']208                            signed_pkg_name = ''.join(random.choice(string.ascii_letters) for i in range(8)) 209                            with open("/pkg/{}.pkg".format(signed_pkg_name), "wb") as f:210                                f.write(b64decode(signed_pkg))211                            # Then, use mdmctl apply to generate a manifest file, or do it yourself.212                            p = subprocess.Popen([213                                "/build/linux/mdmctl",214                                "apply", "app",215                                "-pkg", "/pkg/{}.pkg".format(signed_pkg_name),216                                "-pkg-url", "https://{}/repo/{}.pkg".format(mdm_host, signed_pkg_name),217                                "-upload"218                            ])219                            p.communicate()220                            with open("/pkg/{}.plist".format(signed_pkg_name), "r") as f:221                                plist = f.read()222                            plist.replace("localhost", mdm_host)223                            with open("/pkg/{}.plist".format(signed_pkg_name), "w") as f:224                                f.write(plist)225                            task_json = {226                                "udid": udid,227                                "request_type": "InstallApplication",228                                "manifest_url": "https://{}/repo/{}.plist".format(mdm_host, signed_pkg_name)229                            }230                        r = requests.post("https://localhost/v1/commands", auth=('micromdm', 'mythic'), json=task_json, verify=False)231                        # Create a mapping between MicroMDM command UUIDs and Mythic task UUIDs.232                        command_uuid = r.json()['payload']['command_uuid']233                        234                        agents[udid]['tasks'][command_uuid] = task['id']235                        with lock:236                            with open("agents.json", "w") as f:237                                json.dump(agents, f)  238                        239            elif request.json['topic'] == "mythic.force_callback":240                callback_uuid = request.json['callback_uuid']241                udid = None242                await print_flush(request.json)243                for agent_udid, agent in agents.items():244                    if agent['callback_uuid'] == callback_uuid:245                        udid = agent_udid246                        break247                if not udid:248                    raise Exception("Agent not found")249                await print_flush("Forced callback for {} ({})".format(callback_uuid, udid))250                r = requests.get("https://localhost/push/{}".format(udid), auth=('micromdm', 'mythic'), verify=False)          251        return raw('', status=200)252    except Exception as e:253        if config[request.app.name]['debug']:254            await print_flush("error in agent_message: {}".format(str(e)))255        return await no_match(request, NotFound)256if __name__ == "__main__":257    sys.path.append("/Mythic/mythic")258    from mythic_c2_container.C2ProfileBase import *259    from mythic_c2_container.MythicCallbackRPC import *260    config_file = open("config.json", 'rb')261    main_config = json.loads(config_file.read().decode('utf-8'))262    print("Opening config and starting mdm...")263    sys.stdout.flush()264    # basic mapping of the general endpoints to the real endpoints265    try:266        config['mythic_address'] = os.environ['MYTHIC_ADDRESS']267    except Exception as e:268        print("failed to find MYTHIC_ADDRESS environment variable")269        sys.stdout.flush()270        sys.exit(1)271    config[str(main_config['webhook_port'])] = {'debug': main_config['debug']}272    if main_config['debug']:273        print("Debugging statements are enabled. This gives more context, but might be a performance hit")274    else:275        print("Debugging statements are disabled")276    sys.stdout.flush()277    with open("/certs/mdm.crt", "wb") as f:278        f.write(b64decode(main_config['tls_cert']))279    with open("/certs/mdm.key", "wb") as f:280        f.write(b64decode(main_config['tls_key']))281    mdm_host = main_config['mdm_host']282    # load agents]283    lock = threading.Lock()284    if os.path.exists("agents.json"):285        with open("agents.json", "r") as f:286            agents = json.load(f)287    os.system("cp /var/db/micromdm/micromdm.db /Mythic/micromdm.db")288    os.system("ln -sf /Mythic/micromdm.db /var/db/micromdm/micromdm.db")289    # start mdm290    p = subprocess.Popen([291        "/build/linux/micromdm", "serve",292        "-server-url=https://{}".format(main_config['mdm_host']),293        "-api-key", "mythic",294        "-tls-cert", "/certs/mdm.crt",295        "-tls-key", "/certs/mdm.key",296        "-filerepo", "/pkg",297        "-command-webhook-url", "http://localhost:{}/webhook".format(main_config['webhook_port'])298    ])299    # now to create an app to handle webhook300    app = Sanic(str(main_config['webhook_port']))301    app.config['REQUEST_MAX_SIZE'] = 1000000000302    app.config['REQUEST_TIMEOUT'] = 600303    app.config['RESPONSE_TIMEOUT'] = 600304    app.add_route(agent_message, "/webhook", methods=['GET', 'POST'])305    app.error_handler.add(Exception, server_error_handler)306    if main_config['debug']:307        server = app.create_server(host="127.0.0.1", port=main_config['webhook_port'], debug=True, return_asyncio_server=True, access_log=True)308    else:309        server = app.create_server(host="127.0.0.1", port=main_config['webhook_port'], debug=False, return_asyncio_server=True, access_log=False)310    task = asyncio.ensure_future(server)311    try:312        loop = asyncio.get_event_loop()313        def callback(fut):314            try:315                fetch_count = fut.result()316            except:317                print("port already in use")318                sys.stdout.flush()319                sys.exit()320        task.add_done_callback(callback)321        loop.run_forever()322    except:323        sys.stdout.flush()324        sys.exit()...user_dal.py
Source:user_dal.py  
1import redis2import time3import json4from datetime import datetime5from api.user.constants import INVALID_IDFAS6from api.constants import EnumUserGroup7from api.log import exception_logger, api_logger89from api.user.models import UserSelectInfo, UniqueDeviceInfo10from pyutil.api.redis_key import get_songlist_receive_version_for_udid_key, \11    get_songlist_receive_version_key, get_uid_info_key12from pyutil.api.account import Platform13from api.user.types import MAX_DEVICES_ONLINE, LoginStatus, User14from api.channel.constants import VALID_GENDERS, VALID_AGES1516class UniqueDeviceInfoDAL(object):17    def __init__(self, redis_pool):18        self.udid_status_redis = redis.StrictRedis(connection_pool=redis_pool)1920    @staticmethod21    def get_udid_by_idfa(idfa):22        if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:23            api_logger.error('invalid idfa:{}'.format(idfa))24            return None25        records = list(UniqueDeviceInfo.objects(idfa=idfa))26        if records and len(records[0].unique_device_id) > 10:27            api_logger.info('get_udid_by_idfa succeed|idfa:{}|udid:{}'.format(idfa, records[0].unique_device_id))28            return records[0].unique_device_id29        return None3031    @staticmethod32    def set_udid_by_idfa(idfa, udid, e_flag=''):33        # ç»è®¡è¦æ±ï¼æ æidfaç»ä¸ç½®ä¸ºç©ºåå¨34        if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:35            api_logger.error('invalid idfa:{}|udid:{}'.format(idfa, udid))36            idfa = ''37        device_info = UniqueDeviceInfo.objects(unique_device_id=udid, idfa=idfa).first()38        if device_info is None:39            device_info = UniqueDeviceInfo(idfa=idfa, unique_device_id=udid)40            if e_flag:41                device_info.e_flag = e_flag42            device_info.save()43            api_logger.info('idfa:{}|udid:{}'.format(idfa, udid))4445    @staticmethod46    def set_udid_by_atd(atd, udid, e_flag=''):47        # ç»è®¡è¦æ±ï¼æ æatdç»ä¸ç½®ä¸ºç©ºåå¨48        if not atd or len(set(atd)) < 5:49            api_logger.error('invalid atd:{}|udid:{}'.format(atd, udid))50            atd = ''51        device_info = UniqueDeviceInfo.objects(unique_device_id=udid, atd=atd).first()52        if device_info is None:53            device_info = UniqueDeviceInfo(unique_device_id=udid, atd=atd)54            if e_flag and UniqueDeviceInfo.objects(unique_device_id=udid, e_flag__exists=1).first() is None:55                device_info.e_flag = e_flag56            device_info.save()57            api_logger.info('atd:{}|udid:{}'.format(atd, udid))5859    @staticmethod60    def set_udid_by_afid(af_id, udid):61        if not af_id:62            api_logger.error('invalid af_id:{}'.format(af_id))63            return64        device_info = UniqueDeviceInfo.objects(unique_device_id=udid).first()65        if device_info:66            device_info.af_id = af_id67        else:68            device_info = UniqueDeviceInfo()69            device_info.unique_device_id = udid70            device_info.af_id = af_id71        device_info.save()72        api_logger.info('af_id:{}|udid:{}'.format(af_id, udid))7374    def get_udid_status(self, udid, atd):75        udid_status = EnumUserGroup.ALL76        try:77            udid_ctime = int(self.udid_status_redis.get(udid) or time.time())78            tstruct0 = datetime.now().timetuple()79            tstruct1 = time.localtime(udid_ctime)80            if tstruct0.tm_year == tstruct1.tm_year and tstruct0.tm_yday == tstruct1.tm_yday:81                udid_status = EnumUserGroup.NEW82                # æ°ç¨æ·èèåå
¥atd83                if atd:84                    UniqueDeviceInfoDAL.set_udid_by_atd(atd, udid)85        except Exception as e:86            exception_logger.exception('udid:{} except:{}'.format(udid, e))8788        return udid_status8990    def set_udid_ts(self, udid):91        try:92            self.udid_status_redis.setnx(udid, int(time.time()))93        except Exception as e:94            exception_logger.exception('udid:{}, e:{}'.format(udid, e))9596    def get_udid_ts(self, udid):97        return bool(self.udid_status_redis.get(udid))9899100class UserDAL():101102    def clear_device(self, uid, udid, uid_info_redis_cli):103        key = get_uid_info_key(uid)104        raw_online_devices = uid_info_redis_cli.hget(key, 'online_devices')105        online_devices = json.loads(raw_online_devices) \106            if raw_online_devices else []107        online_devices = [_udid for _udid in online_devices if _udid != udid]108        if online_devices:109            uid_info_redis_cli.hset(110                key, 'online_devices', json.dumps(online_devices))111        else:112            uid_info_redis_cli.delete(key)113114    def get_login_status(115        self, token_expire_at, uid, udid, platform, redis_cli116    ):117        if token_expire_at < int(time.time()):118            if platform == Platform.FACEBOOK:119                login_status = LoginStatus.FACEBOOK_TOKEN_EXPIRE120            elif platform == Platform.TWITTER:121                login_status = LoginStatus.TWITTER_TOKEN_EXPIRE122            else:123                login_status = LoginStatus.UNKNOWN_EXPIRE124        else:125            key = get_uid_info_key(uid)126            raw_online_devices = redis_cli.hget(key, 'online_devices')127            online_devices = json.loads(raw_online_devices) \128                if raw_online_devices else []129            if udid in online_devices:130                login_status = LoginStatus.OK131            else:132                login_status = LoginStatus.FORCE_LOGOUT133        return login_status134135    def save_uid_online_devices(self, uid, udid, redis_cli):136        key = get_uid_info_key(uid)137        raw_online_devices = redis_cli.hget(key, 'online_devices')138        online_devices = json.loads(raw_online_devices) \139            if raw_online_devices else []140        if udid not in online_devices:141            online_devices.append(udid)142        online_devices = online_devices[-MAX_DEVICES_ONLINE:]143        redis_cli.hset(key, 'online_devices', json.dumps(online_devices))144145    def get_songlist_cvs(self, uid, udid, redis_cli):146        if uid:147            id, id_type = uid, User.TYPE_UID148        else:149            id, id_type = udid, User.TYPE_UNIQUE_DEVICE_ID150        cvs_key = get_songlist_receive_version_key(id, id_type)151        version_info = redis_cli.get(cvs_key)152        version_key = get_songlist_receive_version_for_udid_key(udid)153        version_info = {} if not version_info else eval(version_info)154        csv = int(version_info.get(version_key, -1))155        return csv156157158class UserSelectInfoDAL(object):159    def __init__(self):160        pass161162    @staticmethod163    def get(udid, uid):164        pk, utype = (uid, 1) if uid else (udid, 0)165        try:166            user_select_info = UserSelectInfo.objects.get(user_id=pk, user_type=utype)167            user_select_info_dict = dict(gender=user_select_info.gender, age_stage=user_select_info.age_stage)168            return user_select_info_dict169        except Exception as e:170            pass171        return {}172173    @staticmethod174    def set(udid, gender, age_stage, uid):175        pk, utype = (uid, 1) if uid else (udid, 0)176        # æ§å«ãå¹´é¾éè³å°åå¨ä¸ä¸ªææä¿¡æ¯177        if gender not in VALID_GENDERS and age_stage not in VALID_AGES:178            raise Exception('udid={}|invalid gender={}|age={}'.format(udid, gender, age_stage))179180        if gender == -1:181            rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(age_stage=age_stage, mtime=datetime.now(), upsert=True, new=True)182        elif age_stage == -1:183            rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(gender=gender, mtime=datetime.now(), upsert=True, new=True)184        else:185            rv = UserSelectInfo.objects(user_id=pk, user_type=utype).modify(gender=gender, age_stage=age_stage, mtime=datetime.now(), upsert=True, new=True)186        return {'gender': rv.gender, 'age_stage': rv.age_stage}187188189class UniqueDeviceInfoDAL(object):190    def __init__(self, redis_pool):191        self.udid_status_redis = redis.StrictRedis(connection_pool=redis_pool)192193    @staticmethod194    def get_udid_by_idfa(idfa):195        if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:196            api_logger.error('invalid idfa:{}'.format(idfa))197            return None198        records = list(UniqueDeviceInfo.objects(idfa=idfa))199        if records and len(records[0].unique_device_id) > 10:200            api_logger.info('get_udid_by_idfa succeed|idfa:{}|udid:{}'.format(idfa, records[0].unique_device_id))201            return records[0].unique_device_id202        return None203204    @staticmethod205    def set_udid_by_idfa(idfa, udid, e_flag=''):206        # ç»è®¡è¦æ±ï¼æ æidfaç»ä¸ç½®ä¸ºç©ºåå¨207        if not idfa or idfa in INVALID_IDFAS or len(set(idfa)) < 5:208            api_logger.error('invalid idfa:{}|udid:{}'.format(idfa, udid))209            idfa = ''210        device_info = UniqueDeviceInfo.objects(unique_device_id=udid, idfa=idfa).first()211        if device_info is None:212            device_info = UniqueDeviceInfo(idfa=idfa, unique_device_id=udid)213            if e_flag:214                device_info.e_flag = e_flag215            device_info.save()216            api_logger.info('idfa:{}|udid:{}'.format(idfa, udid))217218    @staticmethod219    def set_udid_by_atd(atd, udid, e_flag=''):220        # ç»è®¡è¦æ±ï¼æ æatdç»ä¸ç½®ä¸ºç©ºåå¨221        if not atd or len(set(atd)) < 5:222            api_logger.error('invalid atd:{}|udid:{}'.format(atd, udid))223            atd = ''224        device_info = UniqueDeviceInfo.objects(unique_device_id=udid, atd=atd).first()225        if device_info is None:226            device_info = UniqueDeviceInfo(unique_device_id=udid, atd=atd)227            if e_flag and UniqueDeviceInfo.objects(unique_device_id=udid, e_flag__exists=1).first() is None:228                device_info.e_flag = e_flag229            device_info.save()230            api_logger.info('atd:{}|udid:{}'.format(atd, udid))231232    @staticmethod233    def set_udid_by_afid(af_id, udid):234        if not af_id:235            api_logger.error('invalid af_id:{}'.format(af_id))236            return237        device_info = UniqueDeviceInfo.objects(unique_device_id=udid).first()238        if device_info:239            device_info.af_id = af_id240        else:241            device_info = UniqueDeviceInfo()242            device_info.unique_device_id = udid243            device_info.af_id = af_id244        device_info.save()245        api_logger.info('af_id:{}|udid:{}'.format(af_id, udid))246247    def get_udid_status(self, udid, atd):248        udid_status = EnumUserGroup.ALL249        try:250            udid_ctime = int(self.udid_status_redis.get(udid) or time.time())251            tstruct0 = datetime.now().timetuple()252            tstruct1 = time.localtime(udid_ctime)253            if tstruct0.tm_year == tstruct1.tm_year and tstruct0.tm_yday == tstruct1.tm_yday:254                udid_status = EnumUserGroup.NEW255                # æ°ç¨æ·èèåå
¥atd256                if atd:257                    UniqueDeviceInfoDAL.set_udid_by_atd(atd, udid)258        except Exception as e:259            exception_logger.exception('udid:{} except:{}'.format(udid, e))260        return udid_status261262    def set_udid_ts(self, udid):263        try:264            self.udid_status_redis.setnx(udid, int(time.time()))265        except Exception as e:266            exception_logger.exception('udid:{}, e:{}'.format(udid, e))267268    def get_udid_ts(self, udid):269        return bool(self.udid_status_redis.get(udid))270
...iossim_util.py
Source:iossim_util.py  
...56    if runtime['version'] == version and 'iOS' in runtime['name']:57      return runtime['identifier']58  raise test_runner.SimulatorNotFoundError('Not found "%s" SDK in runtimes %s' %59                                           (version, simulators['runtimes']))60def get_simulator_runtime_by_device_udid(simulator_udid):61  """Gets simulator runtime based on simulator UDID.62  Args:63    simulator_udid: (str) UDID of a simulator.64  """65  simulator_list = get_simulator_list()['devices']66  for runtime, simulators in simulator_list.items():67    for device in simulators:68      if simulator_udid == device['udid']:69        return runtime70  raise test_runner.SimulatorNotFoundError(71      'Not found simulator with "%s" UDID in devices %s' % (simulator_udid,72                                                            simulator_list))73def get_simulator_udids_by_platform_and_version(platform, version):74  """Gets list of simulators UDID based on platform name and iOS version.75    Args:76      platform: (str) A platform name, e.g. "iPhone 11"77      version: (str) A version name, e.g. "13.2.2"78  """79  simulators = get_simulator_list()80  devices = simulators['devices']81  sdk_id = get_simulator_runtime_by_version(simulators, version)82  results = []83  for device in devices.get(sdk_id, []):84    if device['name'] == _compose_simulator_name(platform, version):85      results.append(device['udid'])86  return results87def create_device_by_platform_and_version(platform, version):88  """Creates a simulator and returns UDID of it.89    Args:90      platform: (str) A platform name, e.g. "iPhone 11"91      version: (str) A version name, e.g. "13.2.2"92  """93  name = _compose_simulator_name(platform, version)94  LOGGER.info('Creating simulator %s', name)95  simulators = get_simulator_list()96  device_type = get_simulator_device_type_by_platform(simulators, platform)97  runtime = get_simulator_runtime_by_version(simulators, version)98  try:99    udid = subprocess.check_output(100        ['xcrun', 'simctl', 'create', name, device_type, runtime]).rstrip()101    LOGGER.info('Created simulator in first attempt with UDID: %s', udid)102    # Sometimes above command fails to create a simulator. Verify it and retry103    # once if first attempt failed.104    if not is_device_with_udid_simulator(udid):105      # Try to delete once to avoid duplicate in case of race condition.106      delete_simulator_by_udid(udid)107      udid = subprocess.check_output(108          ['xcrun', 'simctl', 'create', name, device_type, runtime]).rstrip()109      LOGGER.info('Created simulator in second attempt with UDID: %s', udid)110    return udid111  except subprocess.CalledProcessError as e:112    LOGGER.error('Error when creating simulator "%s": %s' % (name, e.output))113    raise e114def delete_simulator_by_udid(udid):115  """Deletes simulator by its udid.116  Args:117    udid: (str) UDID of simulator.118  """119  LOGGER.info('Deleting simulator %s', udid)120  try:121    subprocess.check_output(['xcrun', 'simctl', 'delete', udid],122                            stderr=subprocess.STDOUT)123  except subprocess.CalledProcessError as e:124    # Logging error instead of throwing so we don't cause failures in case125    # this was indeed failing to clean up.126    message = 'Failed to delete simulator %s with error %s' % (udid, e.output)127    LOGGER.error(message)128def wipe_simulator_by_udid(udid):129  """Wipes simulators by its udid.130  Args:131    udid: (str) UDID of simulator.132  """133  for _, devices in get_simulator_list()['devices'].items():134    for device in devices:135      if device['udid'] != udid:136        continue137      try:138        LOGGER.info('Shutdown simulator %s ', device)139        if device['state'] != 'Shutdown':140          subprocess.check_call(['xcrun', 'simctl', 'shutdown', device['udid']])141      except subprocess.CalledProcessError as ex:142        LOGGER.error('Shutdown failed %s ', ex)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
