How to use assert_exist method in Airtest

Best Python code snippet using Airtest

api_server.py

Source:api_server.py Github

copy

Full Screen

1import random2import time3from flask import Flask, request4import json5import uuid6import etcd37import sys8import os9import requests10import logging11BASE_DIR = os.path.dirname(os.path.abspath(__file__))12ROOT_DIR = os.path.join(BASE_DIR, os.path.pardir)13sys.path.append(os.path.join(BASE_DIR, '../helper'))14import utils, const, yaml_loader15from serverless import ServerlessFunction, Edge, DAG16sys.path.append(os.path.join(BASE_DIR, '../worker'))17from entities import parse_bytes18import etcd_controller19app = Flask(__name__)20# CORS(app, supports_credentials=True)21use_etcd = False  # True22etcd = etcd3.client(port=2379)23etcd_supplant = dict()24api_server_url = None25def get(key, assert_exist=True):26    if use_etcd:27        value, _ = etcd.get(key)28        if value:29            return json.loads(value)30    else:31        if etcd_supplant.__contains__(key):32            return json.loads(etcd_supplant[key])  # force deep copy here33    # key not exist34    if assert_exist:35        raise FileNotFoundError36    return None37def put(key, value):38    if use_etcd:39        etcd.put(key, json.dumps(value))40    else:41        etcd_supplant[key] = json.dumps(value)  # force deep copy here42def get_api_server_url():43    return get('api_server_url')44def init_api_server():45    global api_server_url46    # start etcd47    f = open(const.MASTER_API_SERVER_URL_PATH, 'r')48    API_SERVER_URL = str(f.read())49    print(API_SERVER_URL)50    f.close()51    # for the very first start, init etcd52    put('api_server_url', API_SERVER_URL)53    api_server_url = get('api_server_url')54    if get('nodes_list', assert_exist=False) is None:55        put('nodes_list', list())56    if get('pods_list', assert_exist=False) is None:57        put('pods_list', list())58    if get('services_list', assert_exist=False) is None:59        put('services_list', list())60    if get('replica_sets_list', assert_exist=False) is None:61        put('replica_sets_list', list())62    if get('dag_list', assert_exist=False) is None:63        put('dag_list', list())64    if get('functions_list', assert_exist=False) is None:65        put('functions_list', list())66    if get('hpa_list', assert_exist=False) is None:67        put('hpa_list', list())68    if get('jobs_list', assert_exist=False) is None:69        put('jobs_list', list())70    if get('dns_list', assert_exist=False) is None:71        put('dns_list', list())72    if get('dns_config', assert_exist=False) is None:73        put('dns_config', dict())74def delete_key(key):75    if use_etcd:76        etcd.delete(key)77    else:78        etcd_supplant.pop(key)79@app.route('/Node', methods=['GET'])80def handle_nodes():81    result = dict()82    result['nodes_list']: list = get('nodes_list')83    for node_instance_name in result['nodes_list']:84        result[node_instance_name] = get(node_instance_name)85        try:  # we allow timeout86            r = requests.get(url=result[node_instance_name]['url'] + "/heartbeat", timeout=0.01)87        except Exception as e:88            pass89    return json.dumps(result)90@app.route('/Node', methods=['POST'])91def upload_nodes():92    json_data = request.json93    node_config: dict = json.loads(json_data)94    node_instance_name = node_config['instance_name']95    node_config['last_receive_time'] = time.time()96    node_config['status'] = 'READY TO START'97    # print("node_config = ", node_config)98    nodes_list: list = get('nodes_list')99    # node instance name bind with physical mac address100    flag = 0101    for name in nodes_list:102        if name == node_instance_name:103            flag = 1104    if not flag:105        nodes_list.append(node_instance_name)106    put('nodes_list', nodes_list)107    put(node_instance_name, node_config)108    return json.dumps(get('nodes_list')), 200109@app.route('/Node/<string:node_instance_name>', methods=['DELETE'])110def delete_node(node_instance_name: str):111    # set node to not available112    node_instance_config = get(node_instance_name)113    node_instance_config['status'] = 'Not Available'114    put(node_instance_name, node_instance_config)  # save changed status115    # set pod to lost connect116    pods_list = get('pods_list')117    for pod_instance_name in pods_list:118        pod_config = get(pod_instance_name)119        if pod_config.__contains__('node') and pod_config['node'] == node_instance_name:120            pod_config['status'] = 'Lost Connect'121            put(pod_instance_name, pod_config)  # save changed status122    return json.dumps(get('nodes_list')), 200123@app.route('/Node/<string:node_instance_name>', methods=['GET'])124def get_node_instance(node_instance_name: str):125    # set node to not available126    node_instance_config = get(node_instance_name, assert_exist=False)127    if node_instance_config:128        if node_instance_config.get('pod_instances'):129            for pod_instance_name in node_instance_config['pod_instances']:130                node_instance_config[pod_instance_name] = get(pod_instance_name)131        return json.dumps(node_instance_config), 200132    else:133        return json.dumps(dict()), 200134@app.route('/', methods=['GET'])135def get_all():136    return json.dumps('not good'), 200137@app.route('/Pod', methods=['GET'])138def get_pods():139    result = dict()140    result['pods_list'] = get('pods_list')141    for pod_instance_name in result['pods_list']:142        pod_config = get(pod_instance_name, assert_exist=False)143        if pod_config:144            result[pod_instance_name] = pod_config145    return json.dumps(result), 200146@app.route('/Service', methods=['GET'])147def get_services():148    result = dict()149    result['services_list'] = get('services_list')150    for service_instance_name in result['services_list']:151        service_config = get(service_instance_name, assert_exist=False)152        # print("Service = ", service_instance_name)153        if service_config:154            result[service_instance_name] = service_config155    return json.dumps(result), 200156@app.route('/ReplicaSet', methods=['GET'])157def get_replica_set():158    result = dict()159    result['replica_sets_list'] = get('replica_sets_list')160    for replica_set_instance in result['replica_sets_list']:161        config = get(replica_set_instance)162        result[replica_set_instance] = config163        for pod_instance_name in config['pod_instances']:164            pod_config = get(pod_instance_name, assert_exist=False)165            if pod_config:166                result[pod_instance_name] = pod_config167    # print(result)168    return json.dumps(result), 200169@app.route('/HorizontalPodAutoscaler', methods=['GET'])170def get_hpa():171    result = dict()172    result['hpa_list'] = get('hpa_list')173    for hpa_instance in result['hpa_list']:174        config = get(hpa_instance)175        result[hpa_instance] = config176    return json.dumps(result), 200177@app.route('/HorizontalPodAutoscaler', methods=['POST'])178def upload_hpa():179    json_data = request.json180    HPA_config: dict = json.loads(json_data)181    assert HPA_config['kind'] == 'HorizontalPodAutoscaler'182    replica_set_instance_name = HPA_config['name'] + uuid.uuid1().__str__()183    replica_set_config = HPA_config184    replica_set_config['instance_name'] = replica_set_instance_name185    replica_set_config['spec'] = dict()186    replica_set_config['spec']['replicas'] = replica_set_config['minReplicas']187    replica_set_config['kind'] = 'ReplicaSet'188    replica_set_config['created_time'] = time.time()189    replica_set_config['last_change_time'] = time.time()190    replica_set_config['isHPA'] = True191    replica_set_config['pod_instances'] = list()192    hpa_list: list = get('hpa_list')193    hpa_list.append(replica_set_instance_name)194    put('hpa_list', hpa_list)195    replica_sets_list: list = get('replica_sets_list')196    replica_sets_list.append(replica_set_instance_name)197    put('replica_sets_list', replica_sets_list)198    put(replica_set_instance_name, replica_set_config)199    return "Successfully create HPA set instance {}".format(replica_set_instance_name), 200200@app.route('/Dns', methods=['GET'])201def get_dns():202    result = dict()203    result['dns_list'] = get('dns_list')204    for dns_instance_name in result['dns_list']:205        dns_instance_config = get(dns_instance_name, assert_exist=False)206        if dns_instance_config:207            result[dns_instance_name] = dns_instance_config208    return json.dumps(result), 200209@app.route('/Dns/Config', methods=['GET'])210def get_dns_config():211    result = get('dns_config')212    return json.dumps(result), 200213@app.route('/Dns/Config', methods=['POST'])214def post_dns_config():215    json_data = request.json216    new_config: dict = json.loads(json_data)217    config: dict = get('dns_config')218    for key, item in new_config.items():219        config[key] = item220    put('dns_config', config)221    return json.dumps(config)222@app.route('/Pod', methods=['POST'])223def post_pods():224    json_data = request.json225    config: dict = json.loads(json_data)226    assert config['kind'] == 'Pod'227    object_name = config['name']228    instance_name = config['name'] + uuid.uuid1().__str__()229    config['instance_name'] = instance_name230    config['status'] = 'Wait for Schedule'231    config['created_time'] = time.time()232    config['strategy'] = config['strategy'] if config.get('strategy') is not None else 'roundrobin'233    # print("create {}".format(instance_name))234    pods_list: list = get('pods_list')235    pods_list.append(instance_name)236    put('pods_list', pods_list)237    put(instance_name, config)238    schedule(config)239    return json.dumps(config), 200240def schedule(config):241    if config['kind'] == 'Pod' and config['status'] == 'Wait for Schedule':242        r = requests.get(url='{}/Node'.format(api_server_url))243        nodes_list = get('nodes_list')244        instance_name = config['instance_name']245        mem_need = parse_bytes(config['mem'])246        config['status'] = 'Schedule Failed'247        if len(nodes_list) == 0:248            print("no node registered !")249        strategy = config['strategy'] if config.get('strategy') is not None else 'roundrobin'250        index_list = list()251        if strategy == 'roundrobin':252            schedule_counts = get('schedule_counts', assert_exist=False)253            if schedule_counts is None or schedule_counts > 10:254                put('schedule_counts', 0)255            else:256                put('schedule_counts', schedule_counts + 1)257            schedule_counts = get('schedule_counts', assert_exist=True)258            index = schedule_counts % len(nodes_list) if len(nodes_list) > 0 else 0259            for i in range(index, len(nodes_list)):260                index_list.append(i)261            for i in range(0, index):262                index_list.append(i)263        elif strategy == 'random':264            for i in range(0, len(nodes_list)):265                index_list.append(i)266            random.shuffle(index_list)267        else:268            logging.warning('strategy not assigned...')269        for i in index_list:270            node_instance_name = nodes_list[i]271            # todo: check node status here272            current_node = get(node_instance_name)273            print("node status = ", current_node['status'])274            if current_node['status'] != 'Running':275                continue276            print("free_memory = {}, need_memory = {}".format(current_node['free_memory'], mem_need))277            if current_node['free_memory'] > mem_need:278                config['node'] = node_instance_name279                config['status'] = 'Ready to Create'280                break281        if config.__contains__('node'):282            print('把 pod {} 调度到节点 {} 上'.format(instance_name, config['node']))283        else:284            print("Schedule failure")285        url = "{}/Pod/{}/create".format(api_server_url, instance_name)286        json_data = json.dumps(config)287        # 向api_server发送调度结果288        r = requests.post(url=url, json=json_data)289@app.route('/ReplicaSet', methods=['POST'])290def upload_replica_set():291    json_data = request.json292    config: dict = json.loads(json_data)293    assert config['kind'] == 'ReplicaSet'294    assert config['spec']['replicas'] > 0295    replica_set_instance_name = config['name'] + uuid.uuid1().__str__()296    config['instance_name'] = replica_set_instance_name297    config['pod_instances'] = list()298    config['created_time'] = time.time()299    replica_sets_list: list = get('replica_sets_list')300    replica_sets_list.append(replica_set_instance_name)301    put('replica_sets_list', replica_sets_list)302    put(replica_set_instance_name, config)303    return "Successfully create replica set instance {}".format(replica_set_instance_name), 200304@app.route('/Service', methods=['POST'])305def upload_service():306    json_data = request.json307    config: dict = json.loads(json_data)308    assert config['kind'] == 'Service'309    service_instance_name = config['name'] + uuid.uuid1().__str__()310    config['instance_name'] = service_instance_name311    config['pod_instances'] = list()312    config['created_time'] = time.time()313    config['status'] = 'Created'314    services_list: list = get('services_list')315    services_list.append(service_instance_name)316    put('services_list', services_list)317    put(service_instance_name, config)318    url = '{}/Service/{}/{}'.format(api_server_url, service_instance_name, 'create')319    utils.post(url=url, config=config)320    return "Successfully create service instance {}".format(service_instance_name), 200321@app.route('/Service/<string:instance_name>/<string:behavior>', methods=['POST'])322def update_service(instance_name: str, behavior: str):323    json_data = request.json324    config: dict = json.loads(json_data)325    if behavior == 'create':326        config['status'] = 'Creating'327    elif behavior == 'update':328        config['status'] = 'Updating'329    elif behavior == 'restart':330        config['status'] = 'Restarting'331    elif behavior == 'running':332        config['status'] = 'Running'333    elif behavior == 'remove':334        config['status'] = 'Removing'335    elif behavior == 'none':336        config['status'] = 'None'337    put(instance_name, config)338    return "Successfully update service instance {}".format(instance_name), 200339@app.route('/Dns', methods=['POST'])340def upload_dns():341    json_data = request.json342    config: dict = json.loads(json_data)343    assert config['kind'] == 'Dns'344    dns_instance_name = config['name'] + uuid.uuid1().__str__()345    config['instance_name'] = dns_instance_name346    config['service_instances'] = list()347    config['created_time'] = time.time()348    dns_list: list = get('dns_list')349    dns_list.append(dns_instance_name)350    put('dns_list', dns_list)351    put(dns_instance_name, config)352    url = "{}/Dns/{}/{}".format(api_server_url, dns_instance_name, 'create')353    utils.post(url=url, config=config)354    return "Successfully create dns instance {}".format(dns_instance_name), 200355@app.route('/Dns/<string:instance_name>/<string:behavior>', methods=['POST'])356def update_dns(instance_name: str, behavior: str):357    json_data = request.json358    config: dict = json.loads(json_data)359    if behavior == 'create':360        config['status'] = 'Creating'361    elif behavior == 'update':362        config['status'] = 'Updating'363    elif behavior == 'restart':364        config['status'] = 'Restarting'365    elif behavior == 'running':366        config['status'] = 'Running'367    elif behavior == 'remove':368        config['status'] = 'Removing'369    elif behavior == 'none':370        config['status'] = 'None'371    put(instance_name, config)372    return "Successfully update dns instance {}".format(instance_name)373@app.route('/ReplicaSet/<string:instance_name>/', methods=['POST'])374def update_replica_set(instance_name: str):375    json_data = request.json376    config: dict = json.loads(json_data)377    config['status'] = 'Creating'378    put(instance_name, config)379    return "Successfully update replica set instance {}".format(instance_name), 200380@app.route('/Pod/<string:instance_name>', methods=['GET'])381def get_pod_instance(instance_name: str):382    return json.dumps(get(instance_name, assert_exist=False))383@app.route('/Pod/<string:instance_name>/<string:behavior>', methods=['POST'])384def post_pod(instance_name: str, behavior: str):385    if behavior == 'create':386        json_data = request.json387        config: dict = json.loads(json_data)388        put(instance_name, config)389        config['behavior'] = 'create'390    elif behavior == 'update':  # update pods information such as ip391        json_data = request.json392        config: dict = json.loads(json_data)393        put(instance_name, config)394        return 'success', 200395    elif behavior == 'remove':396        config = get(instance_name)397        config['status'] = 'Removed'398        put(instance_name, config)399        config['behavior'] = 'remove'400    elif behavior == 'execute':401        config = get(instance_name)402        json_data = request.json403        upload_cmd: dict = json.loads(json_data)404        # if config.get('cmd') is not None and config['cmd'] != upload_cmd['cmd']:405        config['behavior'] = 'execute'406        config['cmd'] = upload_cmd['cmd']407    elif behavior == 'delete':408        pods_list: list = get('pods_list')409        index = -1410        for i, pod_instance_name in enumerate(pods_list):411            if pod_instance_name == instance_name:412                index = i413        pods_list.pop(index)414        delete_key(instance_name)415        put('pods_list', pods_list)416        return "success", 200417    else:418        return json.dumps(dict()), 404419    # todo: post pod information to related node420    worker_url = None421    pods_list = get('pods_list')422    for pod_instance_name in pods_list:423        if instance_name == pod_instance_name:424            pod_config = get(pod_instance_name)425            if pod_config.get('node') is not None:426                node_instance_name = pod_config['node']427                node_config = get(node_instance_name)428                worker_url = node_config['url']429    if worker_url is not None:  # only scheduler successfully will have a worker_url430        r = requests.post(url=worker_url + "/Pod", json=json.dumps(config))431    # broadcast_message('Pod', config.__str__())432    return json.dumps(config), 200433@app.route('/Function', methods=['GET'])434def get_function():435    result = dict()436    result['functions_list'] = get('functions_list')437    for function_name in result['functions_list']:438        result[function_name] = get(function_name)439    return json.dumps(result), 200440@app.route('/Job', methods=['GET'])441def get_job():442    result = dict()443    result['jobs_list'] = get('jobs_list')444    for job_name in result['jobs_list']:445        result[job_name] = get(job_name)446    return json.dumps(result), 200447@app.route('/Job', methods=['POST'])448def upload_job():449    json_data = request.json450    job_config: dict = json.loads(json_data)451    job_name = job_config['name']452    job_config['created_time'] = time.time()453    job_config['status'] = 'Uploaded'454    job_config['files_list'] = list()455    job_config['pod_instances'] = list()456    jobs_list: list = get('jobs_list')457    # node instance name bind with physical mac address458    flag = 0459    for name in jobs_list:460        if name == job_name:461            flag = 1462    if not flag:463        jobs_list.append(job_name)464    else:  # replace the old one465        old_job_config: dict = get(job_name)466        old_pod_instances = old_job_config['pod_instances']467        for old_pod_instance_name in old_pod_instances:468            r = requests.post("{}/Pod/{}/remove".format(api_server_url, old_pod_instance_name))469    put('jobs_list', jobs_list)470    put(job_name, job_config)  # replace the old one if exist471    return json.dumps(get('jobs_list')), 200472@app.route('/Job/<string:instance_name>/<string:behavior>', methods=['POST'])473def handle_job(instance_name: str, behavior: str):474    json_data = request.json475    config: dict = json.loads(json_data)476    if behavior == 'upload_file':477        job_config = get(instance_name, assert_exist=False)478        if job_config is None:479            return "Not found", 404480        file_name = config['file_name']481        file_data = config['file_data']482        files_list: list = job_config['files_list']483        flag = 0484        for name in files_list:485            if name == file_name:486                flag = 1487        if not flag:488            files_list.append(file_name)489        job_config[file_name] = file_data490        put(instance_name, job_config)491    elif behavior == 'delete':492        jobs_list: list = get('jobs_list')493        match_id = -1494        for index, job_name in enumerate(jobs_list):495            if job_name == instance_name:496                match_id = index497                break498        if match_id != -1:   # matched499            job_config = jobs_list[match_id]500            jobs_list.pop(match_id)501            put('jobs_list', jobs_list)502            for pod_instance_name in job_config['pod_instances']:503                r = requests.post("{}/Pod/{}/remove".format(api_server_url, pod_instance_name), json=json.dumps(dict()))504    elif behavior == 'start':505        job_config = get(instance_name, assert_exist=False)506        if job_config is None:507            return 'Not found ', 404508        job_pod_config: dict = job_config.copy()509        job_pod_config['kind'] = 'Pod'510        job_pod_config['isGPU'] = True511        # upload_config.pop('pod_instances')512        r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(job_pod_config))513        if r.status_code == 200:514            pod_config = json.loads(r.content.decode())515            pod_instance_name = pod_config['instance_name']516            job_pod_config['pod_instances'].append(pod_instance_name)517            put(instance_name, job_config)518            # add the pod into the pod_instances list519            return pod_instance_name, 200520        else:521            return "Wait for container build", 300522    elif behavior == 'submit':523        job_config = get(instance_name, assert_exist=False)524        if job_config is None:525            return 'Not found ', 404526        if len(job_config['pod_instances']) < 1:527            return 'Wait for Pod start', 300528        job_config['last_receive_time'] = time.time()529        put(instance_name, job_config)530        if len(job_config['pod_instances']) < 1:531            return 'Not Found', 404532        pod_instance_name = job_config['pod_instances'][0]533        pod_config = get(pod_instance_name)534        pod_config['last_submitted_time'] = time.time()535        put(pod_instance_name, pod_config)536        pod_ip = pod_config['ip']537        pod_url = "http://" + pod_ip + ':5054/submit'538        upload_config = {"module_name": instance_name}539        r = requests.post(pod_url, json=json.dumps(upload_config))540        if r.status_code == 200:541            result = json.loads(r.content.decode())542            return json.dumps(result), 200543        else:544            return "Submit error", 400545    elif behavior == 'download':546        job_config = get(instance_name, assert_exist=False)547        if job_config is None:548            return 'Not found ', 404549        if len(job_config['pod_instances']) < 1:550            return 'Wait for Pod start', 300551        job_config['last_receive_time'] = time.time()552        put(instance_name, job_config)553        pod_instance_name = job_config['pod_instances'][0]554        pod_config = get(pod_instance_name)555        pod_config['last_submitted_time'] = time.time()556        put(pod_instance_name, pod_config)557        pod_ip = pod_config['ip']558        pod_url = "http://" + pod_ip + ':5054/download'559        upload_config = {"module_name": instance_name}560        r = requests.post(pod_url, json=json.dumps(upload_config))561        if r.status_code == 200:562            result = json.loads(r.content.decode())563            return json.dumps(result), 200564        else:565            return "Download error", r.status_code566@app.route('/Function', methods=['POST'])567def upload_function():568    json_data = request.json569    function_config: dict = json.loads(json_data)570    function_name = function_config['name']571    function_config['created_time'] = time.time()572    function_config['status'] = 'Uploaded'573    function_config['requirement_status'] = 'Not Found'574    function_config['pod_instances'] = list()575    # print("node_config = ", node_config)576    functions_list: list = get('functions_list')577    # node instance name bind with physical mac address578    flag = 0579    for name in functions_list:580        if name == function_name:581            flag = 1582    if not flag:583        functions_list.append(function_name)584    else:  # replace the old one585        old_function_config: dict = get(function_name)586        old_pod_instances = old_function_config['pod_instances']587        for old_pod_instance_name in old_pod_instances:588            r = requests.post("{}/Pod/{}/remove".format(api_server_url, old_pod_instance_name))589    put('functions_list', functions_list)590    put(function_name, function_config)  # replace the old one if exist591    return json.dumps(get('functions_list')), 200592def add_function_pod_instance(function_instance_name, function_config: dict):593    upload_config: dict = function_config.copy()594    upload_config['kind'] = 'Pod'595    upload_config['last_activated_time'] = time.time()596    # upload_config.pop('pod_instances')597    r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(upload_config))598    if r.status_code == 200:599        pod_config = json.loads(r.content.decode())600        pod_instance_name = pod_config['instance_name']601        function_config['pod_instances'].append(pod_instance_name)602        put(function_instance_name, function_config)603        # add the pod into the pod_instances list604        return pod_instance_name605    else:606        return None607@app.route('/Function/<string:instance_name>/<string:behavior>', methods=['POST'])608def handle_function(instance_name: str, behavior: str):609    json_data = request.json610    config: dict = json.loads(json_data)611    if behavior == 'upload_requirement':612        function_config = get(instance_name, assert_exist=False)613        if function_config is None:614            return "Not found", 404615        function_config['requirement'] = config['requirement']616        function_config['status'] = 'Uploaded'617        function_config['requirement_status'] = 'Uploaded'618        put(instance_name, function_config)  # replace the old one if exist619    elif behavior == 'delete':620        function_list: list = get('functions_list')621        match_id = -1622        for index, function_name in enumerate(function_list):623            if function_name == instance_name:624                match_id = index625                break626        if match_id != -1:627            function_config = function_list[match_id]628            function_list.pop(match_id)629            put('functions_list', function_list)630            for pod_instance_name in function_config['pod_instances']:631                r = requests.post("{}/Pod/{}/remove".format(api_server_url, pod_instance_name), json=json.dumps(dict()))632    elif behavior == 'start':  # debug only633        function_config = get(instance_name, assert_exist=False)634        if function_config is None:635            return "Not found", 404636        function_config['kind'] = 'Pod'637        r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(function_config))638        return json.dumps(config), 200639    elif behavior == 'activate':640        function_config = get(instance_name, assert_exist=False)641        if function_config is None:642            return 'Not found ', 404643        pod_instances: list = function_config['pod_instances']644        valid_pod_instance_names = list()645        for pod_instance_name in pod_instances:646            pod_instance_config = get(pod_instance_name, assert_exist=False)647            if pod_instance_config:648                if pod_instance_config['status'] == 'Running' and pod_instance_config.__contains__('ip'):649                    valid_pod_instance_names.append(pod_instance_name)650        if len(valid_pod_instance_names) == 0:651            # if no instance exist, cold start652            pod_instance_name = add_function_pod_instance(instance_name, function_config.copy())653            if pod_instance_name:654                valid_pod_instance_names.append(pod_instance_name)655            else:656                return "Serverless Pod build error", 300657        # config is the parameter yaml, which is the context dict658        # random pick659        function_config['last_receive_time'] = time.time()660        put(instance_name, function_config)661        pod_instance_name = valid_pod_instance_names[random.randint(0, len(valid_pod_instance_names) - 1)]662        pod_config = get(pod_instance_name)663        pod_config['last_activated_time'] = time.time()664        put(pod_instance_name, pod_config)665        pod_ip = pod_config['ip']666        context = config667        function_name = context['function_name']668        pod_url = "http://" + pod_ip + ':5052/function/module/{}'.format(function_name)669        r = requests.post(pod_url, json=json.dumps(context))670        if r.status_code == 200:671            result = json.loads(r.content.decode())672            result['ip'] = pod_ip673            return json.dumps(result), 200674        else:675            return "Activate error", 400676    elif behavior == 'add_instance':677        function_config = get(instance_name, assert_exist=False)678        if function_config is None:679            return 'Not found ', 404680        return add_function_pod_instance(instance_name, function_config.copy())681@app.route('/DAG', methods=['GET'])682def get_dags():683    dag_list = get('dag_list')684    result = dict()685    result['dag_list'] = dag_list686    for dag_name in dag_list:687        dag_config = get(dag_name, assert_exist=False)688        if dag_config:689            result[dag_name] = dag_config690    return json.dumps(result), 200691@app.route('/DAG/<string:dag_name>', methods=['GET'])692def get_dag(dag_name: str):693    dag = get(dag_name, assert_exist=False)694    return json.dumps(dag), 200695def build_DAG_from_dict(dag_dict: dict):696    if not dag_dict.__contains__('elements') or not dag_dict.__contains__(697            'branch_condition') or not dag_dict.__contains__('name_data'):698        return None699    elements = dag_dict['elements']700    branch_condition = dag_dict['branch_condition']701    name_data = dag_dict['name_data']702    node_id_to_name_dict = dict()703    for name in name_data:  # ID to real user-defined name704        node_id_to_name_dict[name[0]] = name[1]['label']705    node_list = list()706    node_dict = dict()707    edge_list = list()708    edge_dict = dict()709    for element in elements:710        element_id = element['id']711        if element.__contains__('position'):  # node712            serverless_function = ServerlessFunction.from_dict(element, node_name=node_id_to_name_dict[element_id])713            if serverless_function:714                node_list.append(serverless_function)715                node_dict[element_id] = serverless_function716            else:717                return None718        elif element.__contains__('source'):  # edge719            edge = Edge.from_dict(element, node_dict)720            if edge:721                edge_list.append(edge)722                edge_dict[element_id] = edge723        else:724            return None725    for edge in edge_list:726        print("edge source = {}, target = {}".format(edge.source, edge.target))727        edge.source.add_out_edge(edge)728        edge_index = edge.index729        if branch_condition.__contains__(edge_index):730            edge.update_condition(branch_condition[edge_index])731    my_dag = DAG.from_node_list_and_edge_list(node_list, edge_list)732    for node in node_list:733        print("type = {}, node_name = {}, module = {}, function ={}".format(node.node_type, node.name, node.module_name, node.function_name))734        for edge in node.out_edge:735            print("out node = ", edge.target.name)736    return my_dag737@app.route('/DAG/<string:dag_name>/<string:behavior>', methods=['POST'])738def handle_DAG(dag_name: str, behavior: str):739    if behavior == 'upload':740        elements = json.loads(request.form.get('elements'))741        branch_condition = json.loads(request.form.get('localStorage'))742        name_data = json.loads(request.form.get("flowData"))['value']743        dag_dict = {'elements': elements, 'branch_condition': branch_condition, 'name_data': name_data}744        my_dag = build_DAG_from_dict(dag_dict)  # is not none means no serious error745        if my_dag:746            dag_list: list = get('dag_list')747            flag = 0748            for name in dag_list:749                if name == dag_name:750                    flag = 1751            if not flag:752                dag_list.append(dag_name)753            dag_dict['status'] = 'Uploaded'754            dag_dict['initial_parameter_status'] = 'Not Found'755            dag_dict['initial_parameter'] = dict()756            put('dag_list', dag_list)757            put(dag_name, dag_dict)  # replace the old one758            return "Successfully save dag", 200759        else:760            return "Save DAG Error", 500761    elif behavior == 'upload_initial_parameter':762        initial_parameter: dict = json.loads(request.json)763        dag_config: dict = get(dag_name)764        dag_config['initial_parameter'] = initial_parameter765        dag_config['initial_parameter_status'] = 'Uploaded'766        put(dag_name, dag_config)767    elif behavior == 'run':768        dag_config = get(dag_name)769        my_dag: DAG = build_DAG_from_dict(dag_config)770        start_node = my_dag.start_node771        end_node: ServerlessFunction = my_dag.end_node772        print("my_dag = ", my_dag)773        print("my_dag = ", my_dag)774        current_node = start_node.out_edge[0].target775        prev_node_result = dag_config['initial_parameter']776        while current_node != end_node:777            parameters: dict = prev_node_result778            parameters['function_name'] = current_node.function_name779            print("parameters = ", parameters)780            # first: run the current node and get result781            module_name = current_node.module_name782            function_instance_name = 'serverless-' + current_node.module_name783            r = requests.post('{}/Function/{}/activate'.format(api_server_url, function_instance_name),784                              json=json.dumps(parameters))785            if r.status_code != 200:786                return "{}.{} activation error!".format(module_name, current_node.function_name), 500787            prev_node_result: dict = json.loads(r.content.decode())788            success = 0789            current_out_edge = current_node.out_edge790            for edge in current_out_edge:791                condition = edge.condition792                print("try to eval condition = ", condition)793                if eval(condition):794                    success = 1795                    current_node = edge.target796                    break797            if success == 0:798                return "Not node match the condition! ", 500799        return json.dumps(prev_node_result), 200800@app.route('/heartbeat', methods=['POST'])801def receive_heartbeat():802    # 收到node发送的心跳包803    json_data = request.json804    heartbeat: dict = json.loads(json_data)805    heartbeat['last_receive_time'] = time.time()806    node_instance_name = heartbeat['instance_name']807    # node_config = get(node_instance_name)808    # if node_config['status'] == 'Not Available':809    #     # we get the heartbeat of a lost node again810    #     node_config['status'] = 'Running'811    for pod_instance_name in heartbeat['pod_instances']:812        pod_heartbeat = heartbeat[pod_instance_name]813        pod_config = get(pod_instance_name, assert_exist=False)814        if pod_config:815            if pod_config['status'] != 'Removed':816                pod_config['status'] = pod_heartbeat['status']817            pod_config['cpu_usage_percent'] = pod_heartbeat['cpu_usage_percent']818            pod_config['memory_usage_percent'] = pod_heartbeat['memory_usage_percent']819            pod_config['ip'] = pod_heartbeat['ip']820            pod_config['volume'] = pod_heartbeat['volume']821            pod_config['ports'] = pod_heartbeat['ports']822            pod_config['node'] = node_instance_name823            pod_config['container_names'] = pod_heartbeat['container_names']824            put(pod_instance_name, pod_config)825        heartbeat.pop(pod_instance_name)  # the information is of no use826    nodes_list = get('nodes_list')827    flag = 0828    for name in nodes_list:829        if name == node_instance_name:830            flag = 1831    if not flag:832        nodes_list.append(node_instance_name)833    put('nodes_list', nodes_list)834    put(node_instance_name, heartbeat)835    return json.dumps(heartbeat), 200836def main():837    init_api_server()838    app.run(host='0.0.0.0', port=5050, processes=True)839if __name__ == '__main__':...

Full Screen

Full Screen

test_course_basic.py

Source:test_course_basic.py Github

copy

Full Screen

...21        logger.info("结束基础课程页面")22    @allure.story("进入送花列表")23    def test_course_flow_list(self, init_course):24        init_course.course_basic_flow_list()25        self.base.assert_exist("给老师送花")26    @allure.story("客服引导公众号关注")27    def test_course_customer_ad(self, init_course):28        init_course.course_basic_customer_ad()29        self.base.assert_exist("客服会话")30    @allure.story("进入基本设置")31    def test_course_settings(self, init_course):32        init_course.course_basic_settings()33        self.base.click("邀请打卡", "设置")34        self.base.click_advance({"location": (0.892, 0.296), "description": "切换点评消息提醒"})35        self.base.assert_exist("群主点评消息提醒")36        self.base.click("保存并退出 ", "保存并退出")37        self.base.assert_not_exist("群主点评消息提醒")38    @allure.story("进入打卡")39    def test_course_enter_punch(self, init_course):40        init_course.course_basic_punch()41        self.base.assert_exist("录音")42        self.base.assert_exist("图片")43        self.base.assert_exist("视频")44        self.base.send_keys("记录今天的感想和收获吧~", "测试打卡内容", "测试打卡内容")45        self.base.back()46        self.base.click("发表日志", "发表日志")47        self.base.assert_exist("删除")48    @allure.story("送花")49    def test_course_send_flow(self, init_course):50        init_course.course_basic_send_flow()51        self.base.click("送出", "送出")52        self.base.wait_element_gone("绘制送花图片中", "绘制送花图片中", 10)53        self.base.click("确认送出", "确认送出")54        self.base.click("com.tencent.mm:id/az_", "确认")55        self.base.assert_exist("送花")56    @allure.story("进入消息中心")57    def test_course_message(self, init_course):58        init_course.course_basic_message()59        self.base.click("点赞评论(0)", "点赞评论(0)")60        self.base.click("系统消息(0)", "系统消息(0)")61        self.base.assert_exist("暂无未读消息")62    @allure.story("进入排行榜")63    def test_course_charts(self, init_course):64        init_course.course_basic_charts()65        self.base.assert_exist("任天野")66        self.base.click("鲜花榜", "鲜花榜")67        self.base.assert_exist("我也去送花")68        self.base.click("勋章榜", "勋章榜")69        self.base.assert_exist("赶紧成为排行榜第一名")70    @allure.story("用户分享")71    def test_course_share_button(self, init_course):72        init_course.course_basic_share_button()73        self.base.assert_exist("创建新聊天")74    # @allure.story("筛选")                         # 管理员处回归75    # def test_course_filter(self, init_course):76    #     init_course.course_basic_filter()77    @allure.story("更多课程")78    def test_course_more_course(self, init_course):79        init_course.course_basic_more_course()80        self.base.assert_exist("家庭教育收费班级")81    @allure.story("快捷导航")82    def test_course_navigate(self, init_course):83        init_course.course_basic_navigate()84        self.base.click_advance({"location": (0.912, 0.742), "description": "导航去送花"})85        self.base.assert_exist("添加祝福语")86    @allure.story("发表评论")87    def test_course_comment(self, init_course):88        self.base.wait_time(1)89        init_course.course_basic_comment()90        self.base.wait_time(1)91        self.base.send_keys("写评论", "这是评论", "评论")92        self.base.back()93        self.base.wait_time(1)94        self.base.click("发表评论 ", "发表评论")95        self.base.assert_exist("Rhapsody")96    @allure.story("删除评论")97    def test_delete_course_comment(self, init_course):98        self.base.click("Rhapsody", "Rhapsody")99        self.base.click_advance({"location": (0.506, 0.901), "description": "删除评论"})100        self.base.click("com.tencent.mm:id/az_", "确认删除")101    @allure.story("点赞")102    def test_course_praise(self,init_course):103        self.base.wait_time(1)104        init_course.course_basic_praise()105        self.base.assert_exist("点赞成功 +1分")106    @allure.story("日记分享")107    def test_course_diary_share(self, init_course):108        init_course.course_basic_diary_share()109        self.base.assert_exist("创建新聊天")110    @allure.story("查看今日打卡内容")111    def test_course_basic_diary_view(self, init_course):112        init_course.course_basic_diary_view()...

Full Screen

Full Screen

test_mine.py

Source:test_mine.py Github

copy

Full Screen

...22    @allure.story("我的账户")23    def test_mine_balance(self, init_mine):24        init_mine.mine_balance()25        init_mine.mine_diamont_pay()26        self.base.assert_exist("请输入支付密码")27    @allure.story("会员权益")28    def test_mine_rights(self, init_mine):29        init_mine.mine_rights("专属花朵")30        self.base.assert_exist("会员定制花朵 任性送老师")31        self.base.click("✖", "✖")32        # TODO 写法有点丑 后续封装下33        # init_mine.mine_rights("AI评测")34        # self.base.assert_exist("孩子的专属智能 陪伴小助手")35        # self.base.click("✖", "✖")36        # init_mine.mine_rights("专属皮肤")37        # self.base.assert_exist("会员标识及皮肤 彰显与众不同")38        # self.base.click("✖", "✖")39        # init_mine.mine_rights("优惠入班")40        # self.base.assert_exist("会员购买打卡课程 可享受专属折扣")41        # self.base.click("✖", "✖")42        # init_mine.mine_rights("记录下载")43        # self.base.assert_exist("学习打卡记录 随时下载")44    45    @allure.story("我的评论")46    def test_mine_comment(self, init_mine):47        init_mine.mine_notebook()48        init_mine.mine_comment()49        self.base.assert_exist("这是评论")50    @allure.story("我的赞")51    def test_mine_priaise(self, init_mine):52        init_mine.mine_notebook()53        init_mine.mine_priaise()54        self.base.assert_exist("图为证")55    @allure.story("日记本")56    def test_mine_notebook(self, init_mine):57        init_mine.mine_notebook()58        self.base.click("群名:图", "群名:图")59        self.base.assert_exist("2019.04.08")60    @allure.story("优惠券")61    def test_mine_coupon(self, init_mine):62        init_mine.mine_coupon()63        self.base.click("立刻使用", "立刻使用")64        self.base.assert_exist("家庭教育收费班级")65    @allure.story("我的-礼物兑换")66    def test_mine_gift(self, init_mine):67        init_mine.mine_gift()68        self.base.assert_exist("礼物兑换")69    @allure.story("我的-排行榜")70    @pytest.mark.parametrize("_type", ["积分榜", "鲜花榜", "勋章榜"])71    def test_mine_chart(self, init_mine, _type):72        init_mine.mine_chart()73        self.base.assert_exist("第1名")74        self.base.assert_exist("一年级一班")75        self.base.click(_type, _type)76        if _type == "积分榜":77            self.base.assert_exist("实验三小2018级3班")78        else:79            self.base.assert_exist("Linda")80    @allure.story("我的-分享小程序")81    def test_mine_share(self, init_mine):82        init_mine.mine_share()83        self.base.assert_exist("创建新聊天")84    @allure.story("我的-帮助")85    def test_mine_help(self, init_mine):86        init_mine.mine_help()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful