How to use assert_exist method in Airtest

Best Python code snippet using Airtest

api_server.py

Source:api_server.py Github

copy

Full Screen

1import random2import time3from flask import Flask, request4import json5import uuid6import etcd37import sys8import os9import requests10import logging11BASE_DIR = os.path.dirname(os.path.abspath(__file__))12ROOT_DIR = os.path.join(BASE_DIR, os.path.pardir)13sys.path.append(os.path.join(BASE_DIR, '../helper'))14import utils, const, yaml_loader15from serverless import ServerlessFunction, Edge, DAG16sys.path.append(os.path.join(BASE_DIR, '../worker'))17from entities import parse_bytes18import etcd_controller19app = Flask(__name__)20# CORS(app, supports_credentials=True)21use_etcd = False # True22etcd = etcd3.client(port=2379)23etcd_supplant = dict()24api_server_url = None25def get(key, assert_exist=True):26 if use_etcd:27 value, _ = etcd.get(key)28 if value:29 return json.loads(value)30 else:31 if etcd_supplant.__contains__(key):32 return json.loads(etcd_supplant[key]) # force deep copy here33 # key not exist34 if assert_exist:35 raise FileNotFoundError36 return None37def put(key, value):38 if use_etcd:39 etcd.put(key, json.dumps(value))40 else:41 etcd_supplant[key] = json.dumps(value) # force deep copy here42def get_api_server_url():43 return get('api_server_url')44def init_api_server():45 global api_server_url46 # start etcd47 f = open(const.MASTER_API_SERVER_URL_PATH, 'r')48 API_SERVER_URL = str(f.read())49 print(API_SERVER_URL)50 f.close()51 # for the very first start, init etcd52 put('api_server_url', API_SERVER_URL)53 api_server_url = get('api_server_url')54 if get('nodes_list', assert_exist=False) is None:55 put('nodes_list', list())56 if get('pods_list', assert_exist=False) is None:57 put('pods_list', list())58 if get('services_list', assert_exist=False) is None:59 put('services_list', list())60 if get('replica_sets_list', assert_exist=False) is None:61 put('replica_sets_list', list())62 if get('dag_list', assert_exist=False) is None:63 put('dag_list', list())64 if get('functions_list', assert_exist=False) is None:65 put('functions_list', list())66 if get('hpa_list', assert_exist=False) is None:67 put('hpa_list', list())68 if get('jobs_list', assert_exist=False) is None:69 put('jobs_list', list())70 if get('dns_list', assert_exist=False) is None:71 put('dns_list', list())72 if get('dns_config', assert_exist=False) is None:73 put('dns_config', dict())74def delete_key(key):75 if use_etcd:76 etcd.delete(key)77 else:78 etcd_supplant.pop(key)79@app.route('/Node', methods=['GET'])80def handle_nodes():81 result = dict()82 result['nodes_list']: list = get('nodes_list')83 for node_instance_name in result['nodes_list']:84 result[node_instance_name] = get(node_instance_name)85 try: # we allow timeout86 r = requests.get(url=result[node_instance_name]['url'] + "/heartbeat", timeout=0.01)87 except Exception as e:88 pass89 return json.dumps(result)90@app.route('/Node', methods=['POST'])91def upload_nodes():92 json_data = request.json93 node_config: dict = json.loads(json_data)94 node_instance_name = node_config['instance_name']95 node_config['last_receive_time'] = time.time()96 node_config['status'] = 'READY TO START'97 # print("node_config = ", node_config)98 nodes_list: list = get('nodes_list')99 # node instance name bind with physical mac address100 flag = 0101 for name in nodes_list:102 if name == node_instance_name:103 flag = 1104 if not flag:105 nodes_list.append(node_instance_name)106 put('nodes_list', nodes_list)107 put(node_instance_name, node_config)108 return json.dumps(get('nodes_list')), 200109@app.route('/Node/<string:node_instance_name>', methods=['DELETE'])110def delete_node(node_instance_name: str):111 # set node to not available112 node_instance_config = get(node_instance_name)113 node_instance_config['status'] = 'Not Available'114 put(node_instance_name, node_instance_config) # save changed status115 # set pod to lost connect116 pods_list = get('pods_list')117 for pod_instance_name in pods_list:118 pod_config = get(pod_instance_name)119 if pod_config.__contains__('node') and pod_config['node'] == node_instance_name:120 pod_config['status'] = 'Lost Connect'121 put(pod_instance_name, pod_config) # save changed status122 return json.dumps(get('nodes_list')), 200123@app.route('/Node/<string:node_instance_name>', methods=['GET'])124def get_node_instance(node_instance_name: str):125 # set node to not available126 node_instance_config = get(node_instance_name, assert_exist=False)127 if node_instance_config:128 if node_instance_config.get('pod_instances'):129 for pod_instance_name in node_instance_config['pod_instances']:130 node_instance_config[pod_instance_name] = get(pod_instance_name)131 return json.dumps(node_instance_config), 200132 else:133 return json.dumps(dict()), 200134@app.route('/', methods=['GET'])135def get_all():136 return json.dumps('not good'), 200137@app.route('/Pod', methods=['GET'])138def get_pods():139 result = dict()140 result['pods_list'] = get('pods_list')141 for pod_instance_name in result['pods_list']:142 pod_config = get(pod_instance_name, assert_exist=False)143 if pod_config:144 result[pod_instance_name] = pod_config145 return json.dumps(result), 200146@app.route('/Service', methods=['GET'])147def get_services():148 result = dict()149 result['services_list'] = get('services_list')150 for service_instance_name in result['services_list']:151 service_config = get(service_instance_name, assert_exist=False)152 # print("Service = ", service_instance_name)153 if service_config:154 result[service_instance_name] = service_config155 return json.dumps(result), 200156@app.route('/ReplicaSet', methods=['GET'])157def get_replica_set():158 result = dict()159 result['replica_sets_list'] = get('replica_sets_list')160 for replica_set_instance in result['replica_sets_list']:161 config = get(replica_set_instance)162 result[replica_set_instance] = config163 for pod_instance_name in config['pod_instances']:164 pod_config = get(pod_instance_name, assert_exist=False)165 if pod_config:166 result[pod_instance_name] = pod_config167 # print(result)168 return json.dumps(result), 200169@app.route('/HorizontalPodAutoscaler', methods=['GET'])170def get_hpa():171 result = dict()172 result['hpa_list'] = get('hpa_list')173 for hpa_instance in result['hpa_list']:174 config = get(hpa_instance)175 result[hpa_instance] = config176 return json.dumps(result), 200177@app.route('/HorizontalPodAutoscaler', methods=['POST'])178def upload_hpa():179 json_data = request.json180 HPA_config: dict = json.loads(json_data)181 assert HPA_config['kind'] == 'HorizontalPodAutoscaler'182 replica_set_instance_name = HPA_config['name'] + uuid.uuid1().__str__()183 replica_set_config = HPA_config184 replica_set_config['instance_name'] = replica_set_instance_name185 replica_set_config['spec'] = dict()186 replica_set_config['spec']['replicas'] = replica_set_config['minReplicas']187 replica_set_config['kind'] = 'ReplicaSet'188 replica_set_config['created_time'] = time.time()189 replica_set_config['last_change_time'] = time.time()190 replica_set_config['isHPA'] = True191 replica_set_config['pod_instances'] = list()192 hpa_list: list = get('hpa_list')193 hpa_list.append(replica_set_instance_name)194 put('hpa_list', hpa_list)195 replica_sets_list: list = get('replica_sets_list')196 replica_sets_list.append(replica_set_instance_name)197 put('replica_sets_list', replica_sets_list)198 put(replica_set_instance_name, replica_set_config)199 return "Successfully create HPA set instance {}".format(replica_set_instance_name), 200200@app.route('/Dns', methods=['GET'])201def get_dns():202 result = dict()203 result['dns_list'] = get('dns_list')204 for dns_instance_name in result['dns_list']:205 dns_instance_config = get(dns_instance_name, assert_exist=False)206 if dns_instance_config:207 result[dns_instance_name] = dns_instance_config208 return json.dumps(result), 200209@app.route('/Dns/Config', methods=['GET'])210def get_dns_config():211 result = get('dns_config')212 return json.dumps(result), 200213@app.route('/Dns/Config', methods=['POST'])214def post_dns_config():215 json_data = request.json216 new_config: dict = json.loads(json_data)217 config: dict = get('dns_config')218 for key, item in new_config.items():219 config[key] = item220 put('dns_config', config)221 return json.dumps(config)222@app.route('/Pod', methods=['POST'])223def post_pods():224 json_data = request.json225 config: dict = json.loads(json_data)226 assert config['kind'] == 'Pod'227 object_name = config['name']228 instance_name = config['name'] + uuid.uuid1().__str__()229 config['instance_name'] = instance_name230 config['status'] = 'Wait for Schedule'231 config['created_time'] = time.time()232 config['strategy'] = config['strategy'] if config.get('strategy') is not None else 'roundrobin'233 # print("create {}".format(instance_name))234 pods_list: list = get('pods_list')235 pods_list.append(instance_name)236 put('pods_list', pods_list)237 put(instance_name, config)238 schedule(config)239 return json.dumps(config), 200240def schedule(config):241 if config['kind'] == 'Pod' and config['status'] == 'Wait for Schedule':242 r = requests.get(url='{}/Node'.format(api_server_url))243 nodes_list = get('nodes_list')244 instance_name = config['instance_name']245 mem_need = parse_bytes(config['mem'])246 config['status'] = 'Schedule Failed'247 if len(nodes_list) == 0:248 print("no node registered !")249 strategy = config['strategy'] if config.get('strategy') is not None else 'roundrobin'250 index_list = list()251 if strategy == 'roundrobin':252 schedule_counts = get('schedule_counts', assert_exist=False)253 if schedule_counts is None or schedule_counts > 10:254 put('schedule_counts', 0)255 else:256 put('schedule_counts', schedule_counts + 1)257 schedule_counts = get('schedule_counts', assert_exist=True)258 index = schedule_counts % len(nodes_list) if len(nodes_list) > 0 else 0259 for i in range(index, len(nodes_list)):260 index_list.append(i)261 for i in range(0, index):262 index_list.append(i)263 elif strategy == 'random':264 for i in range(0, len(nodes_list)):265 index_list.append(i)266 random.shuffle(index_list)267 else:268 logging.warning('strategy not assigned...')269 for i in index_list:270 node_instance_name = nodes_list[i]271 # todo: check node status here272 current_node = get(node_instance_name)273 print("node status = ", current_node['status'])274 if current_node['status'] != 'Running':275 continue276 print("free_memory = {}, need_memory = {}".format(current_node['free_memory'], mem_need))277 if current_node['free_memory'] > mem_need:278 config['node'] = node_instance_name279 config['status'] = 'Ready to Create'280 break281 if config.__contains__('node'):282 print('把 pod {} 调度到节点 {} 上'.format(instance_name, config['node']))283 else:284 print("Schedule failure")285 url = "{}/Pod/{}/create".format(api_server_url, instance_name)286 json_data = json.dumps(config)287 # 向api_server发送调度结果288 r = requests.post(url=url, json=json_data)289@app.route('/ReplicaSet', methods=['POST'])290def upload_replica_set():291 json_data = request.json292 config: dict = json.loads(json_data)293 assert config['kind'] == 'ReplicaSet'294 assert config['spec']['replicas'] > 0295 replica_set_instance_name = config['name'] + uuid.uuid1().__str__()296 config['instance_name'] = replica_set_instance_name297 config['pod_instances'] = list()298 config['created_time'] = time.time()299 replica_sets_list: list = get('replica_sets_list')300 replica_sets_list.append(replica_set_instance_name)301 put('replica_sets_list', replica_sets_list)302 put(replica_set_instance_name, config)303 return "Successfully create replica set instance {}".format(replica_set_instance_name), 200304@app.route('/Service', methods=['POST'])305def upload_service():306 json_data = request.json307 config: dict = json.loads(json_data)308 assert config['kind'] == 'Service'309 service_instance_name = config['name'] + uuid.uuid1().__str__()310 config['instance_name'] = service_instance_name311 config['pod_instances'] = list()312 config['created_time'] = time.time()313 config['status'] = 'Created'314 services_list: list = get('services_list')315 services_list.append(service_instance_name)316 put('services_list', services_list)317 put(service_instance_name, config)318 url = '{}/Service/{}/{}'.format(api_server_url, service_instance_name, 'create')319 utils.post(url=url, config=config)320 return "Successfully create service instance {}".format(service_instance_name), 200321@app.route('/Service/<string:instance_name>/<string:behavior>', methods=['POST'])322def update_service(instance_name: str, behavior: str):323 json_data = request.json324 config: dict = json.loads(json_data)325 if behavior == 'create':326 config['status'] = 'Creating'327 elif behavior == 'update':328 config['status'] = 'Updating'329 elif behavior == 'restart':330 config['status'] = 'Restarting'331 elif behavior == 'running':332 config['status'] = 'Running'333 elif behavior == 'remove':334 config['status'] = 'Removing'335 elif behavior == 'none':336 config['status'] = 'None'337 put(instance_name, config)338 return "Successfully update service instance {}".format(instance_name), 200339@app.route('/Dns', methods=['POST'])340def upload_dns():341 json_data = request.json342 config: dict = json.loads(json_data)343 assert config['kind'] == 'Dns'344 dns_instance_name = config['name'] + uuid.uuid1().__str__()345 config['instance_name'] = dns_instance_name346 config['service_instances'] = list()347 config['created_time'] = time.time()348 dns_list: list = get('dns_list')349 dns_list.append(dns_instance_name)350 put('dns_list', dns_list)351 put(dns_instance_name, config)352 url = "{}/Dns/{}/{}".format(api_server_url, dns_instance_name, 'create')353 utils.post(url=url, config=config)354 return "Successfully create dns instance {}".format(dns_instance_name), 200355@app.route('/Dns/<string:instance_name>/<string:behavior>', methods=['POST'])356def update_dns(instance_name: str, behavior: str):357 json_data = request.json358 config: dict = json.loads(json_data)359 if behavior == 'create':360 config['status'] = 'Creating'361 elif behavior == 'update':362 config['status'] = 'Updating'363 elif behavior == 'restart':364 config['status'] = 'Restarting'365 elif behavior == 'running':366 config['status'] = 'Running'367 elif behavior == 'remove':368 config['status'] = 'Removing'369 elif behavior == 'none':370 config['status'] = 'None'371 put(instance_name, config)372 return "Successfully update dns instance {}".format(instance_name)373@app.route('/ReplicaSet/<string:instance_name>/', methods=['POST'])374def update_replica_set(instance_name: str):375 json_data = request.json376 config: dict = json.loads(json_data)377 config['status'] = 'Creating'378 put(instance_name, config)379 return "Successfully update replica set instance {}".format(instance_name), 200380@app.route('/Pod/<string:instance_name>', methods=['GET'])381def get_pod_instance(instance_name: str):382 return json.dumps(get(instance_name, assert_exist=False))383@app.route('/Pod/<string:instance_name>/<string:behavior>', methods=['POST'])384def post_pod(instance_name: str, behavior: str):385 if behavior == 'create':386 json_data = request.json387 config: dict = json.loads(json_data)388 put(instance_name, config)389 config['behavior'] = 'create'390 elif behavior == 'update': # update pods information such as ip391 json_data = request.json392 config: dict = json.loads(json_data)393 put(instance_name, config)394 return 'success', 200395 elif behavior == 'remove':396 config = get(instance_name)397 config['status'] = 'Removed'398 put(instance_name, config)399 config['behavior'] = 'remove'400 elif behavior == 'execute':401 config = get(instance_name)402 json_data = request.json403 upload_cmd: dict = json.loads(json_data)404 # if config.get('cmd') is not None and config['cmd'] != upload_cmd['cmd']:405 config['behavior'] = 'execute'406 config['cmd'] = upload_cmd['cmd']407 elif behavior == 'delete':408 pods_list: list = get('pods_list')409 index = -1410 for i, pod_instance_name in enumerate(pods_list):411 if pod_instance_name == instance_name:412 index = i413 pods_list.pop(index)414 delete_key(instance_name)415 put('pods_list', pods_list)416 return "success", 200417 else:418 return json.dumps(dict()), 404419 # todo: post pod information to related node420 worker_url = None421 pods_list = get('pods_list')422 for pod_instance_name in pods_list:423 if instance_name == pod_instance_name:424 pod_config = get(pod_instance_name)425 if pod_config.get('node') is not None:426 node_instance_name = pod_config['node']427 node_config = get(node_instance_name)428 worker_url = node_config['url']429 if worker_url is not None: # only scheduler successfully will have a worker_url430 r = requests.post(url=worker_url + "/Pod", json=json.dumps(config))431 # broadcast_message('Pod', config.__str__())432 return json.dumps(config), 200433@app.route('/Function', methods=['GET'])434def get_function():435 result = dict()436 result['functions_list'] = get('functions_list')437 for function_name in result['functions_list']:438 result[function_name] = get(function_name)439 return json.dumps(result), 200440@app.route('/Job', methods=['GET'])441def get_job():442 result = dict()443 result['jobs_list'] = get('jobs_list')444 for job_name in result['jobs_list']:445 result[job_name] = get(job_name)446 return json.dumps(result), 200447@app.route('/Job', methods=['POST'])448def upload_job():449 json_data = request.json450 job_config: dict = json.loads(json_data)451 job_name = job_config['name']452 job_config['created_time'] = time.time()453 job_config['status'] = 'Uploaded'454 job_config['files_list'] = list()455 job_config['pod_instances'] = list()456 jobs_list: list = get('jobs_list')457 # node instance name bind with physical mac address458 flag = 0459 for name in jobs_list:460 if name == job_name:461 flag = 1462 if not flag:463 jobs_list.append(job_name)464 else: # replace the old one465 old_job_config: dict = get(job_name)466 old_pod_instances = old_job_config['pod_instances']467 for old_pod_instance_name in old_pod_instances:468 r = requests.post("{}/Pod/{}/remove".format(api_server_url, old_pod_instance_name))469 put('jobs_list', jobs_list)470 put(job_name, job_config) # replace the old one if exist471 return json.dumps(get('jobs_list')), 200472@app.route('/Job/<string:instance_name>/<string:behavior>', methods=['POST'])473def handle_job(instance_name: str, behavior: str):474 json_data = request.json475 config: dict = json.loads(json_data)476 if behavior == 'upload_file':477 job_config = get(instance_name, assert_exist=False)478 if job_config is None:479 return "Not found", 404480 file_name = config['file_name']481 file_data = config['file_data']482 files_list: list = job_config['files_list']483 flag = 0484 for name in files_list:485 if name == file_name:486 flag = 1487 if not flag:488 files_list.append(file_name)489 job_config[file_name] = file_data490 put(instance_name, job_config)491 elif behavior == 'delete':492 jobs_list: list = get('jobs_list')493 match_id = -1494 for index, job_name in enumerate(jobs_list):495 if job_name == instance_name:496 match_id = index497 break498 if match_id != -1: # matched499 job_config = jobs_list[match_id]500 jobs_list.pop(match_id)501 put('jobs_list', jobs_list)502 for pod_instance_name in job_config['pod_instances']:503 r = requests.post("{}/Pod/{}/remove".format(api_server_url, pod_instance_name), json=json.dumps(dict()))504 elif behavior == 'start':505 job_config = get(instance_name, assert_exist=False)506 if job_config is None:507 return 'Not found ', 404508 job_pod_config: dict = job_config.copy()509 job_pod_config['kind'] = 'Pod'510 job_pod_config['isGPU'] = True511 # upload_config.pop('pod_instances')512 r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(job_pod_config))513 if r.status_code == 200:514 pod_config = json.loads(r.content.decode())515 pod_instance_name = pod_config['instance_name']516 job_pod_config['pod_instances'].append(pod_instance_name)517 put(instance_name, job_config)518 # add the pod into the pod_instances list519 return pod_instance_name, 200520 else:521 return "Wait for container build", 300522 elif behavior == 'submit':523 job_config = get(instance_name, assert_exist=False)524 if job_config is None:525 return 'Not found ', 404526 if len(job_config['pod_instances']) < 1:527 return 'Wait for Pod start', 300528 job_config['last_receive_time'] = time.time()529 put(instance_name, job_config)530 if len(job_config['pod_instances']) < 1:531 return 'Not Found', 404532 pod_instance_name = job_config['pod_instances'][0]533 pod_config = get(pod_instance_name)534 pod_config['last_submitted_time'] = time.time()535 put(pod_instance_name, pod_config)536 pod_ip = pod_config['ip']537 pod_url = "http://" + pod_ip + ':5054/submit'538 upload_config = {"module_name": instance_name}539 r = requests.post(pod_url, json=json.dumps(upload_config))540 if r.status_code == 200:541 result = json.loads(r.content.decode())542 return json.dumps(result), 200543 else:544 return "Submit error", 400545 elif behavior == 'download':546 job_config = get(instance_name, assert_exist=False)547 if job_config is None:548 return 'Not found ', 404549 if len(job_config['pod_instances']) < 1:550 return 'Wait for Pod start', 300551 job_config['last_receive_time'] = time.time()552 put(instance_name, job_config)553 pod_instance_name = job_config['pod_instances'][0]554 pod_config = get(pod_instance_name)555 pod_config['last_submitted_time'] = time.time()556 put(pod_instance_name, pod_config)557 pod_ip = pod_config['ip']558 pod_url = "http://" + pod_ip + ':5054/download'559 upload_config = {"module_name": instance_name}560 r = requests.post(pod_url, json=json.dumps(upload_config))561 if r.status_code == 200:562 result = json.loads(r.content.decode())563 return json.dumps(result), 200564 else:565 return "Download error", r.status_code566@app.route('/Function', methods=['POST'])567def upload_function():568 json_data = request.json569 function_config: dict = json.loads(json_data)570 function_name = function_config['name']571 function_config['created_time'] = time.time()572 function_config['status'] = 'Uploaded'573 function_config['requirement_status'] = 'Not Found'574 function_config['pod_instances'] = list()575 # print("node_config = ", node_config)576 functions_list: list = get('functions_list')577 # node instance name bind with physical mac address578 flag = 0579 for name in functions_list:580 if name == function_name:581 flag = 1582 if not flag:583 functions_list.append(function_name)584 else: # replace the old one585 old_function_config: dict = get(function_name)586 old_pod_instances = old_function_config['pod_instances']587 for old_pod_instance_name in old_pod_instances:588 r = requests.post("{}/Pod/{}/remove".format(api_server_url, old_pod_instance_name))589 put('functions_list', functions_list)590 put(function_name, function_config) # replace the old one if exist591 return json.dumps(get('functions_list')), 200592def add_function_pod_instance(function_instance_name, function_config: dict):593 upload_config: dict = function_config.copy()594 upload_config['kind'] = 'Pod'595 upload_config['last_activated_time'] = time.time()596 # upload_config.pop('pod_instances')597 r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(upload_config))598 if r.status_code == 200:599 pod_config = json.loads(r.content.decode())600 pod_instance_name = pod_config['instance_name']601 function_config['pod_instances'].append(pod_instance_name)602 put(function_instance_name, function_config)603 # add the pod into the pod_instances list604 return pod_instance_name605 else:606 return None607@app.route('/Function/<string:instance_name>/<string:behavior>', methods=['POST'])608def handle_function(instance_name: str, behavior: str):609 json_data = request.json610 config: dict = json.loads(json_data)611 if behavior == 'upload_requirement':612 function_config = get(instance_name, assert_exist=False)613 if function_config is None:614 return "Not found", 404615 function_config['requirement'] = config['requirement']616 function_config['status'] = 'Uploaded'617 function_config['requirement_status'] = 'Uploaded'618 put(instance_name, function_config) # replace the old one if exist619 elif behavior == 'delete':620 function_list: list = get('functions_list')621 match_id = -1622 for index, function_name in enumerate(function_list):623 if function_name == instance_name:624 match_id = index625 break626 if match_id != -1:627 function_config = function_list[match_id]628 function_list.pop(match_id)629 put('functions_list', function_list)630 for pod_instance_name in function_config['pod_instances']:631 r = requests.post("{}/Pod/{}/remove".format(api_server_url, pod_instance_name), json=json.dumps(dict()))632 elif behavior == 'start': # debug only633 function_config = get(instance_name, assert_exist=False)634 if function_config is None:635 return "Not found", 404636 function_config['kind'] = 'Pod'637 r = requests.post("{}/Pod".format(api_server_url), json=json.dumps(function_config))638 return json.dumps(config), 200639 elif behavior == 'activate':640 function_config = get(instance_name, assert_exist=False)641 if function_config is None:642 return 'Not found ', 404643 pod_instances: list = function_config['pod_instances']644 valid_pod_instance_names = list()645 for pod_instance_name in pod_instances:646 pod_instance_config = get(pod_instance_name, assert_exist=False)647 if pod_instance_config:648 if pod_instance_config['status'] == 'Running' and pod_instance_config.__contains__('ip'):649 valid_pod_instance_names.append(pod_instance_name)650 if len(valid_pod_instance_names) == 0:651 # if no instance exist, cold start652 pod_instance_name = add_function_pod_instance(instance_name, function_config.copy())653 if pod_instance_name:654 valid_pod_instance_names.append(pod_instance_name)655 else:656 return "Serverless Pod build error", 300657 # config is the parameter yaml, which is the context dict658 # random pick659 function_config['last_receive_time'] = time.time()660 put(instance_name, function_config)661 pod_instance_name = valid_pod_instance_names[random.randint(0, len(valid_pod_instance_names) - 1)]662 pod_config = get(pod_instance_name)663 pod_config['last_activated_time'] = time.time()664 put(pod_instance_name, pod_config)665 pod_ip = pod_config['ip']666 context = config667 function_name = context['function_name']668 pod_url = "http://" + pod_ip + ':5052/function/module/{}'.format(function_name)669 r = requests.post(pod_url, json=json.dumps(context))670 if r.status_code == 200:671 result = json.loads(r.content.decode())672 result['ip'] = pod_ip673 return json.dumps(result), 200674 else:675 return "Activate error", 400676 elif behavior == 'add_instance':677 function_config = get(instance_name, assert_exist=False)678 if function_config is None:679 return 'Not found ', 404680 return add_function_pod_instance(instance_name, function_config.copy())681@app.route('/DAG', methods=['GET'])682def get_dags():683 dag_list = get('dag_list')684 result = dict()685 result['dag_list'] = dag_list686 for dag_name in dag_list:687 dag_config = get(dag_name, assert_exist=False)688 if dag_config:689 result[dag_name] = dag_config690 return json.dumps(result), 200691@app.route('/DAG/<string:dag_name>', methods=['GET'])692def get_dag(dag_name: str):693 dag = get(dag_name, assert_exist=False)694 return json.dumps(dag), 200695def build_DAG_from_dict(dag_dict: dict):696 if not dag_dict.__contains__('elements') or not dag_dict.__contains__(697 'branch_condition') or not dag_dict.__contains__('name_data'):698 return None699 elements = dag_dict['elements']700 branch_condition = dag_dict['branch_condition']701 name_data = dag_dict['name_data']702 node_id_to_name_dict = dict()703 for name in name_data: # ID to real user-defined name704 node_id_to_name_dict[name[0]] = name[1]['label']705 node_list = list()706 node_dict = dict()707 edge_list = list()708 edge_dict = dict()709 for element in elements:710 element_id = element['id']711 if element.__contains__('position'): # node712 serverless_function = ServerlessFunction.from_dict(element, node_name=node_id_to_name_dict[element_id])713 if serverless_function:714 node_list.append(serverless_function)715 node_dict[element_id] = serverless_function716 else:717 return None718 elif element.__contains__('source'): # edge719 edge = Edge.from_dict(element, node_dict)720 if edge:721 edge_list.append(edge)722 edge_dict[element_id] = edge723 else:724 return None725 for edge in edge_list:726 print("edge source = {}, target = {}".format(edge.source, edge.target))727 edge.source.add_out_edge(edge)728 edge_index = edge.index729 if branch_condition.__contains__(edge_index):730 edge.update_condition(branch_condition[edge_index])731 my_dag = DAG.from_node_list_and_edge_list(node_list, edge_list)732 for node in node_list:733 print("type = {}, node_name = {}, module = {}, function ={}".format(node.node_type, node.name, node.module_name, node.function_name))734 for edge in node.out_edge:735 print("out node = ", edge.target.name)736 return my_dag737@app.route('/DAG/<string:dag_name>/<string:behavior>', methods=['POST'])738def handle_DAG(dag_name: str, behavior: str):739 if behavior == 'upload':740 elements = json.loads(request.form.get('elements'))741 branch_condition = json.loads(request.form.get('localStorage'))742 name_data = json.loads(request.form.get("flowData"))['value']743 dag_dict = {'elements': elements, 'branch_condition': branch_condition, 'name_data': name_data}744 my_dag = build_DAG_from_dict(dag_dict) # is not none means no serious error745 if my_dag:746 dag_list: list = get('dag_list')747 flag = 0748 for name in dag_list:749 if name == dag_name:750 flag = 1751 if not flag:752 dag_list.append(dag_name)753 dag_dict['status'] = 'Uploaded'754 dag_dict['initial_parameter_status'] = 'Not Found'755 dag_dict['initial_parameter'] = dict()756 put('dag_list', dag_list)757 put(dag_name, dag_dict) # replace the old one758 return "Successfully save dag", 200759 else:760 return "Save DAG Error", 500761 elif behavior == 'upload_initial_parameter':762 initial_parameter: dict = json.loads(request.json)763 dag_config: dict = get(dag_name)764 dag_config['initial_parameter'] = initial_parameter765 dag_config['initial_parameter_status'] = 'Uploaded'766 put(dag_name, dag_config)767 elif behavior == 'run':768 dag_config = get(dag_name)769 my_dag: DAG = build_DAG_from_dict(dag_config)770 start_node = my_dag.start_node771 end_node: ServerlessFunction = my_dag.end_node772 print("my_dag = ", my_dag)773 print("my_dag = ", my_dag)774 current_node = start_node.out_edge[0].target775 prev_node_result = dag_config['initial_parameter']776 while current_node != end_node:777 parameters: dict = prev_node_result778 parameters['function_name'] = current_node.function_name779 print("parameters = ", parameters)780 # first: run the current node and get result781 module_name = current_node.module_name782 function_instance_name = 'serverless-' + current_node.module_name783 r = requests.post('{}/Function/{}/activate'.format(api_server_url, function_instance_name),784 json=json.dumps(parameters))785 if r.status_code != 200:786 return "{}.{} activation error!".format(module_name, current_node.function_name), 500787 prev_node_result: dict = json.loads(r.content.decode())788 success = 0789 current_out_edge = current_node.out_edge790 for edge in current_out_edge:791 condition = edge.condition792 print("try to eval condition = ", condition)793 if eval(condition):794 success = 1795 current_node = edge.target796 break797 if success == 0:798 return "Not node match the condition! ", 500799 return json.dumps(prev_node_result), 200800@app.route('/heartbeat', methods=['POST'])801def receive_heartbeat():802 # 收到node发送的心跳包803 json_data = request.json804 heartbeat: dict = json.loads(json_data)805 heartbeat['last_receive_time'] = time.time()806 node_instance_name = heartbeat['instance_name']807 # node_config = get(node_instance_name)808 # if node_config['status'] == 'Not Available':809 # # we get the heartbeat of a lost node again810 # node_config['status'] = 'Running'811 for pod_instance_name in heartbeat['pod_instances']:812 pod_heartbeat = heartbeat[pod_instance_name]813 pod_config = get(pod_instance_name, assert_exist=False)814 if pod_config:815 if pod_config['status'] != 'Removed':816 pod_config['status'] = pod_heartbeat['status']817 pod_config['cpu_usage_percent'] = pod_heartbeat['cpu_usage_percent']818 pod_config['memory_usage_percent'] = pod_heartbeat['memory_usage_percent']819 pod_config['ip'] = pod_heartbeat['ip']820 pod_config['volume'] = pod_heartbeat['volume']821 pod_config['ports'] = pod_heartbeat['ports']822 pod_config['node'] = node_instance_name823 pod_config['container_names'] = pod_heartbeat['container_names']824 put(pod_instance_name, pod_config)825 heartbeat.pop(pod_instance_name) # the information is of no use826 nodes_list = get('nodes_list')827 flag = 0828 for name in nodes_list:829 if name == node_instance_name:830 flag = 1831 if not flag:832 nodes_list.append(node_instance_name)833 put('nodes_list', nodes_list)834 put(node_instance_name, heartbeat)835 return json.dumps(heartbeat), 200836def main():837 init_api_server()838 app.run(host='0.0.0.0', port=5050, processes=True)839if __name__ == '__main__':...

Full Screen

Full Screen

test_course_basic.py

Source:test_course_basic.py Github

copy

Full Screen

...21 logger.info("结束基础课程页面")22 @allure.story("进入送花列表")23 def test_course_flow_list(self, init_course):24 init_course.course_basic_flow_list()25 self.base.assert_exist("给老师送花")26 @allure.story("客服引导公众号关注")27 def test_course_customer_ad(self, init_course):28 init_course.course_basic_customer_ad()29 self.base.assert_exist("客服会话")30 @allure.story("进入基本设置")31 def test_course_settings(self, init_course):32 init_course.course_basic_settings()33 self.base.click("邀请打卡", "设置")34 self.base.click_advance({"location": (0.892, 0.296), "description": "切换点评消息提醒"})35 self.base.assert_exist("群主点评消息提醒")36 self.base.click("保存并退出 ", "保存并退出")37 self.base.assert_not_exist("群主点评消息提醒")38 @allure.story("进入打卡")39 def test_course_enter_punch(self, init_course):40 init_course.course_basic_punch()41 self.base.assert_exist("录音")42 self.base.assert_exist("图片")43 self.base.assert_exist("视频")44 self.base.send_keys("记录今天的感想和收获吧~", "测试打卡内容", "测试打卡内容")45 self.base.back()46 self.base.click("发表日志", "发表日志")47 self.base.assert_exist("删除")48 @allure.story("送花")49 def test_course_send_flow(self, init_course):50 init_course.course_basic_send_flow()51 self.base.click("送出", "送出")52 self.base.wait_element_gone("绘制送花图片中", "绘制送花图片中", 10)53 self.base.click("确认送出", "确认送出")54 self.base.click("com.tencent.mm:id/az_", "确认")55 self.base.assert_exist("送花")56 @allure.story("进入消息中心")57 def test_course_message(self, init_course):58 init_course.course_basic_message()59 self.base.click("点赞评论(0)", "点赞评论(0)")60 self.base.click("系统消息(0)", "系统消息(0)")61 self.base.assert_exist("暂无未读消息")62 @allure.story("进入排行榜")63 def test_course_charts(self, init_course):64 init_course.course_basic_charts()65 self.base.assert_exist("任天野")66 self.base.click("鲜花榜", "鲜花榜")67 self.base.assert_exist("我也去送花")68 self.base.click("勋章榜", "勋章榜")69 self.base.assert_exist("赶紧成为排行榜第一名")70 @allure.story("用户分享")71 def test_course_share_button(self, init_course):72 init_course.course_basic_share_button()73 self.base.assert_exist("创建新聊天")74 # @allure.story("筛选") # 管理员处回归75 # def test_course_filter(self, init_course):76 # init_course.course_basic_filter()77 @allure.story("更多课程")78 def test_course_more_course(self, init_course):79 init_course.course_basic_more_course()80 self.base.assert_exist("家庭教育收费班级")81 @allure.story("快捷导航")82 def test_course_navigate(self, init_course):83 init_course.course_basic_navigate()84 self.base.click_advance({"location": (0.912, 0.742), "description": "导航去送花"})85 self.base.assert_exist("添加祝福语")86 @allure.story("发表评论")87 def test_course_comment(self, init_course):88 self.base.wait_time(1)89 init_course.course_basic_comment()90 self.base.wait_time(1)91 self.base.send_keys("写评论", "这是评论", "评论")92 self.base.back()93 self.base.wait_time(1)94 self.base.click("发表评论 ", "发表评论")95 self.base.assert_exist("Rhapsody")96 @allure.story("删除评论")97 def test_delete_course_comment(self, init_course):98 self.base.click("Rhapsody", "Rhapsody")99 self.base.click_advance({"location": (0.506, 0.901), "description": "删除评论"})100 self.base.click("com.tencent.mm:id/az_", "确认删除")101 @allure.story("点赞")102 def test_course_praise(self,init_course):103 self.base.wait_time(1)104 init_course.course_basic_praise()105 self.base.assert_exist("点赞成功 +1分")106 @allure.story("日记分享")107 def test_course_diary_share(self, init_course):108 init_course.course_basic_diary_share()109 self.base.assert_exist("创建新聊天")110 @allure.story("查看今日打卡内容")111 def test_course_basic_diary_view(self, init_course):112 init_course.course_basic_diary_view()...

Full Screen

Full Screen

test_mine.py

Source:test_mine.py Github

copy

Full Screen

...22 @allure.story("我的账户")23 def test_mine_balance(self, init_mine):24 init_mine.mine_balance()25 init_mine.mine_diamont_pay()26 self.base.assert_exist("请输入支付密码")27 @allure.story("会员权益")28 def test_mine_rights(self, init_mine):29 init_mine.mine_rights("专属花朵")30 self.base.assert_exist("会员定制花朵 任性送老师")31 self.base.click("✖", "✖")32 # TODO 写法有点丑 后续封装下33 # init_mine.mine_rights("AI评测")34 # self.base.assert_exist("孩子的专属智能 陪伴小助手")35 # self.base.click("✖", "✖")36 # init_mine.mine_rights("专属皮肤")37 # self.base.assert_exist("会员标识及皮肤 彰显与众不同")38 # self.base.click("✖", "✖")39 # init_mine.mine_rights("优惠入班")40 # self.base.assert_exist("会员购买打卡课程 可享受专属折扣")41 # self.base.click("✖", "✖")42 # init_mine.mine_rights("记录下载")43 # self.base.assert_exist("学习打卡记录 随时下载")44 45 @allure.story("我的评论")46 def test_mine_comment(self, init_mine):47 init_mine.mine_notebook()48 init_mine.mine_comment()49 self.base.assert_exist("这是评论")50 @allure.story("我的赞")51 def test_mine_priaise(self, init_mine):52 init_mine.mine_notebook()53 init_mine.mine_priaise()54 self.base.assert_exist("图为证")55 @allure.story("日记本")56 def test_mine_notebook(self, init_mine):57 init_mine.mine_notebook()58 self.base.click("群名:图", "群名:图")59 self.base.assert_exist("2019.04.08")60 @allure.story("优惠券")61 def test_mine_coupon(self, init_mine):62 init_mine.mine_coupon()63 self.base.click("立刻使用", "立刻使用")64 self.base.assert_exist("家庭教育收费班级")65 @allure.story("我的-礼物兑换")66 def test_mine_gift(self, init_mine):67 init_mine.mine_gift()68 self.base.assert_exist("礼物兑换")69 @allure.story("我的-排行榜")70 @pytest.mark.parametrize("_type", ["积分榜", "鲜花榜", "勋章榜"])71 def test_mine_chart(self, init_mine, _type):72 init_mine.mine_chart()73 self.base.assert_exist("第1名")74 self.base.assert_exist("一年级一班")75 self.base.click(_type, _type)76 if _type == "积分榜":77 self.base.assert_exist("实验三小2018级3班")78 else:79 self.base.assert_exist("Linda")80 @allure.story("我的-分享小程序")81 def test_mine_share(self, init_mine):82 init_mine.mine_share()83 self.base.assert_exist("创建新聊天")84 @allure.story("我的-帮助")85 def test_mine_help(self, init_mine):86 init_mine.mine_help()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Airtest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful