How to use post_data_json method in Playwright Python

Best Python code snippet using playwright-python

common_network_vm_proxmox.py

Source:common_network_vm_proxmox.py Github

copy

Full Screen

1"""2 Copyright (C) 2016 Quinn D Granfor <spootdev@gmail.com>3 This program is free software; you can redistribute it and/or4 modify it under the terms of the GNU General Public License5 version 2, as published by the Free Software Foundation.6 This program is distributed in the hope that it will be useful, but7 WITHOUT ANY WARRANTY; without even the implied warranty of8 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU9 General Public License version 2 for more details.10 You should have received a copy of the GNU General Public License11 version 2 along with this program; if not, write to the Free12 Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,13 MA 02110-1301, USA.14"""15import requests16from requests.packages.urllib3.exceptions import InsecureRequestWarning17requests.packages.urllib3.disable_warnings(InsecureRequestWarning)18class CommonNetworkProxMox:19 """20 Class for interfacing via proxmox21 """22 def __init__(self, node_addr, node_user, node_password):23 self.httpheaders = {'Accept': 'application/json',24 'Content-Type': 'application/x-www-form-urlencoded'}25 self.full_url = ('https://%s:8006/api2/json/' % node_addr)26 self.node_user = node_user27 self.node_password = node_password28 # can't return a value with init. So, broke out the connect to handle bad user/pass29 def com_net_prox_api_connect(self):30 self.api_response = requests.post(self.full_url + 'access/ticket', verify=False,31 data={'username': self.node_user,32 'password': self.node_password}).json()33 if self.api_response['data'] is None:34 return None35 self.prox_ticket = {'PVEAuthCookie': self.api_response['data']['ticket']}36 self.httpheaders['CSRFPreventionToken'] = str(37 self.api_response['data']['CSRFPreventionToken'])38 return self.api_response39 def com_net_prox_api_call(self, request_type, api_call_type, post_data=None):40 """41 Do api call to specified connection42 """43 if request_type == "get":44 return requests.get(self.full_url + api_call_type, verify=False,45 cookies=self.prox_ticket, timeout=5).json()46 else:47 return requests.post(self.full_url + api_call_type, verify=False,48 data=post_data,49 cookies=self.prox_ticket,50 headers=self.httpheaders).json()51 ###52 # Access API53 ###54 def com_net_prox_access(self):55 """56 Directory index.57 """58 return self.com_net_prox_api_call('get', 'access')59 ###60 # Access/domains API61 ###62 ###63 # Access/groups API64 ###65 ###66 # Access/roles API67 ###68 ###69 # Access/users API70 ###71 ###72 # Access/acl API73 ###74 ###75 # Access/password API76 ###77 ###78 # Access/ticket API79 ###80 ###81 # Cluster API82 ###83 def com_net_prox_cluster_ndx(self):84 """85 Get the index of the cluster86 """87 return self.com_net_prox_api_call('get', 'cluster')88 ###89 # Cluster/backup API90 ###91 def com_net_prox_cluster_backup(self):92 """93 Get the backup list of the cluster94 """95 return self.com_net_prox_api_call('get', 'cluster/backup')96 def com_net_prox_cluster_backup_create(self, post_data_json):97 """98 Create new vzdump backup job.99 """100 return self.com_net_prox_api_call('post', 'cluster/backup', post_data_json)101 def com_net_prox_cluster_backup_conf(self, backup_id):102 """103 Read vzdump backup job definition.104 """105 return self.com_net_prox_api_call('get', 'cluster/backup/%s' % backup_id)106 def com_net_prox_cluster_backup_update(self, backup_id, post_data_json):107 """108 Update vzdump backup job definition.109 """110 return self.com_net_prox_api_call('get', 'cluster/backup/%s' % backup_id, post_data_json)111 def com_net_prox_cluster_backup_delete(self, backup_id):112 """113 Delete vzdump backup job definition.114 """115 return self.com_net_prox_api_call('delete', 'cluster/backup/%s' % backup_id)116 ###117 # Cluster config118 ###119 def com_net_prox_cluster_config_nodes(self):120 return self.com_net_prox_api_call('get', 'cluster/config/nodes')121 # TODO node create122 # TODO node delete123 # TODO node info on cluster124 # TODO node join cluster125 # TODO node Get corosync totem protocol settings.126 ###127 # Cluster/firewall API128 ###129 def com_net_prox_cluster_firewall(self):130 """131 Directory index.132 """133 return self.com_net_prox_api_call('get', 'cluster/firewall')134 ###135 # Cluster/ha API136 ###137 def com_net_prox_cluster_ha(self):138 """139 Directory index.140 """141 return self.com_net_prox_api_call('get', 'cluster/ha')142 def com_net_prox_cluster_ha_groups(self):143 """144 Get HA groups.145 """146 return self.com_net_prox_api_call('get', 'cluster/ha/groups')147 def com_net_prox_cluster_ha_group_create(self, post_data_json):148 """149 Create a new HA group.150 """151 return self.com_net_prox_api_call('put', 'cluster/ha/groups', post_data_json)152 ###153 # Cluster/log API154 ###155 def com_net_prox_cluster_log(self):156 """157 Read cluster log158 """159 return self.com_net_prox_api_call('get', 'cluster/log')160 ###161 # Cluster/nextid API162 ###163 def com_net_prox_cluster_nextid(self):164 """165 Get next free VMID. If you pass an VMID it will raise an error if the ID is already used.166 """167 return self.com_net_prox_api_call('get', 'cluster/nextid')168 ###169 # Cluster/options API170 ###171 def com_net_prox_cluster_options(self):172 """173 Get datacenter options.174 """175 return self.com_net_prox_api_call('get', 'cluster/options')176 def com_net_prox_cluster_options_update(self, post_data_json):177 """178 Set datacenter options.179 """180 return self.com_net_prox_api_call('put', 'cluster/options', post_data_json)181 ###182 # Cluster/resources API183 ###184 def com_net_prox_cluster_resources(self):185 """186 Resources index (cluster wide).187 """188 return self.com_net_prox_api_call('get', 'cluster/resources')189 ###190 # Cluster/status API191 ###192 def com_net_prox_cluster_status(self):193 """194 Get the status of the cluster195 """196 return self.com_net_prox_api_call('get', 'cluster/status')197 ###198 # Cluster/tasks API199 ###200 def com_net_prox_cluster_tasks(self):201 """202 List recent tasks (cluster wide).203 """204 return self.com_net_prox_api_call('get', 'cluster/tasks')205 ###206 # Nodes207 ###208 def com_net_prox_node_index(self, node_name):209 """210 Cluster node index.211 """212 return self.com_net_prox_api_call('get', node_name)213 ###214 # Nodes/apt215 ###216 ###217 # Nodes/ceph218 ###219 ###220 # Nodes/firewall221 ###222 ###223 # Nodes/lxc224 ###225 def com_net_prox_node_lxc_list(self, node_name):226 """227 Get a list of the lxc vms on node228 """229 return self.com_net_prox_api_call('get', 'nodes/%s/lxc' % node_name)230 def com_net_prox_node_lxc_create(self, node_name, host_name, memory_size,231 template_name, storage_id, os_type, cpu_limit, password):232 """233 create lxc on code234 """235 post_data = {'vmid': self.com_net_prox_cluster_nextid()['data'],236 'ostemplate': template_name,237 'hostname': host_name,238 'storage': storage_id,239 'cpulimit': cpu_limit,240 'memory': memory_size,241 'ostype': os_type,242 'password': password,243 # 'net': 'name=eth0',244 # 'rootfs': '32',245 # 'rootfs': 'vm-118-disk-1a, size=32G',246 }247 print(('post %s' % post_data), flush=True)248 return self.com_net_prox_api_call('post', 'nodes/%s/lxc' % node_name, post_data)249 def com_net_prox_node_lxc_status(self, node_name, vm_id):250 """251 Get a status of the lxc vm252 """253 return self.com_net_prox_api_call('get', 'nodes/%s/lxc/%s/status/current'254 % (node_name, vm_id))255 def com_net_prox_node_lxc_start(self, node_name, vm_id):256 """257 Start lxc vm258 """259 return self.com_net_prox_api_call('post', 'nodes/%s/lxc/%s/status/start'260 % (node_name, vm_id))261 def com_net_prox_node_lxc_stop(self, node_name, vm_id):262 """263 Stop lxc vm264 """265 return self.com_net_prox_api_call('post', 'nodes/%s/lxc/%s/status/stop'266 % (node_name, vm_id))267 def com_net_prox_node_lxc_snaps(self, node_name, vm_id):268 """269 List snaps for lxc vm270 """271 return self.com_net_prox_api_call('get', 'nodes/%s/lxc/%s/snapshot'272 % (node_name, vm_id))273 def com_net_prox_node_lxc_clone(self, node_name, vm_id, post_data_json):274 """275 Create clone for lxc vm276 """277 return self.com_net_prox_api_call('post', 'nodes/%s/lxc/%s/clone'278 % (node_name, vm_id), post_data_json)279 ###280 # Nodes/network281 ###282 def com_net_prox_net(self, node_name):283 """284 List available networks285 """286 return self.com_net_prox_api_call('get', 'nodes/%s/network' % node_name)287 def com_net_prox_net_create(self, node_name, post_data_json):288 """289 List available networks290 """291 return self.com_net_prox_api_call('put', 'nodes/%s/network' % node_name, post_data_json)292 def com_net_prox_net_delete(self, node_name, post_data_json):293 """294 Revert network configuration changes.295 """296 return self.com_net_prox_api_call('delete', 'nodes/%s/network' % node_name, post_data_json)297 def com_net_prox_net_nic_conf(self, node_name, iface):298 """299 Read network device configuration300 """301 return self.com_net_prox_api_call('get', 'nodes/%s/network/%s' % (node_name, iface))302 def com_net_prox_net_nic_conf_update(self, node_name, iface, post_data_json):303 """304 Update network device configuration305 """306 return self.com_net_prox_api_call('get', 'nodes/%s/network/%s'307 % (node_name, iface), post_data_json)308 def com_net_prox_net_nic_conf_delete(self, node_name, iface):309 """310 Delete network device configuration311 """312 return self.com_net_prox_api_call('get', 'nodes/%s/network/%s' % (node_name, iface))313 ###314 # Nodes/qemu315 ###316 def com_net_prox_node_qemu_list(self, node_name):317 """318 Get a list of the qemu vms on node319 """320 return self.com_net_prox_api_call('get', 'nodes/%s/qemu' % node_name)321 def com_net_prox_node_qemu_status(self, node_name, vm_id):322 """323 Get a status of the qemu vm324 """325 return self.com_net_prox_api_call('get', 'nodes/%s/qemu/%s/status/current'326 % (node_name, vm_id))327 def com_net_prox_node_qemu_start(self, node_name, vm_id):328 """329 Start qemu vm330 """331 return self.com_net_prox_api_call('post', 'nodes/%s/qemu/%s/status/start'332 % (node_name, vm_id))333 def com_net_prox_node_qemu_stop(self, node_name, vm_id):334 """335 Stop qemu vm336 """337 return self.com_net_prox_api_call('post', 'nodes/%s/qemu/%s/status/stop'338 % (node_name, vm_id))339 def com_net_prox_node_qemu_snaps(self, node_name, vm_id):340 """341 List snaps for qemu vm342 """343 return self.com_net_prox_api_call('get', 'nodes/%s/qemu/%s/snapshot'344 % (node_name, vm_id))345 def com_net_prox_node_qemu_clone(self, node_name, vm_id, post_data_json):346 """347 Create clone for qemu vm348 """349 return self.com_net_prox_api_call('post', 'nodes/%s/qemu/%s/clone'350 % (node_name, vm_id), post_data_json)351 ###352 # Nodes/scan353 ###354 def com_net_prox_node_scan(self, node_name):355 """356 Index of available scan methods357 """358 return self.com_net_prox_api_call('get', 'nodes/%s/scan' % node_name)359 def com_net_prox_node_scan_glusterfs(self, node_name, post_data_json):360 """361 Scan remote GlusterFS server.362 """363 return self.com_net_prox_api_call('get', 'nodes/%s/scan/glusterfs' % node_name,364 post_data_json)365 def com_net_prox_node_scan_iscsi(self, node_name, post_data_json):366 """367 Scan remote iSCSI server.368 """369 return self.com_net_prox_api_call('get', 'nodes/%s/scan/iscsi' % node_name,370 post_data_json)371 def com_net_prox_node_scan_lvm(self, node_name):372 """373 List local LVM volume groups.374 """375 return self.com_net_prox_api_call('get', 'nodes/%s/scan/lvm' % node_name)376 def com_net_prox_node_scan_lvmthin(self, node_name, post_data_json):377 """378 List local LVM Thin Pools.379 """380 return self.com_net_prox_api_call('get', 'nodes/%s/scan/lvmthin' % node_name,381 post_data_json)382 def com_net_prox_node_scan_nfs(self, node_name, post_data_json):383 """384 Scan remote NFS server.385 """386 return self.com_net_prox_api_call('get', 'nodes/%s/scan/nfs' % node_name,387 post_data_json)388 def com_net_prox_node_scan_usb(self, node_name):389 """390 List local USB devices.391 """392 return self.com_net_prox_api_call('get', 'nodes/%s/scan/usb' % node_name)393 def com_net_prox_node_scan_zfs(self, node_name):394 """395 Scan zfs pool list on local node.396 """397 return self.com_net_prox_api_call('get', 'nodes/%s/scan/zfs' % node_name)398 ###399 # Nodes/services400 ###401 def com_net_prox_node_services(self, node_name):402 """403 Service list.404 """405 return self.com_net_prox_api_call('get', 'nodes/%s/services' % node_name)406 def com_net_prox_node_services_ndx(self, node_name, service_id):407 """408 Directory index409 """410 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s' % (node_name, service_id))411 def com_net_prox_node_service_reload(self, node_name, service_id):412 """413 Reload service.414 """415 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s/relaod'416 % (node_name, service_id))417 def com_net_prox_node_service_restart(self, node_name, service_id):418 """419 Restart service.420 """421 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s/restart'422 % (node_name, service_id))423 def com_net_prox_node_service_start(self, node_name, service_id):424 """425 Start service.426 """427 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s/start'428 % (node_name, service_id))429 def com_net_prox_node_service_state(self, node_name, service_id):430 """431 Service state.432 """433 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s/state'434 % (node_name, service_id))435 def com_net_prox_node_service_stop(self, node_name, service_id):436 """437 Stop service.438 """439 return self.com_net_prox_api_call('get', 'nodes/%s/services/%s/stop'440 % (node_name, service_id))441 ###442 # Nodes/storage443 ###444 def com_net_prox_node_storage_status(self, node_name, post_data_json):445 """446 Get status for all datastores.447 """448 return self.com_net_prox_api_call('get', 'nodes/%s/storage' % node_name, post_data_json)449 ###450 # Nodes/tasks451 ###452 ###453 # Nodes/aplinfo454 ###455 ###456 # Nodes/dns457 ###458 ###459 # Nodes/execute460 ###461 ###462 # Nodes/migreateall463 ###464 ###465 # Nodes/netstat466 ###467 def com_net_prox_node_netstat(self, node_name):468 """469 Get a network device counters of node470 """471 return self.com_net_prox_api_call('get', 'nodes/%s/netstat' % node_name)472 ###473 # Nodes/report474 ###475 def com_net_prox_node_report(self, node_name):476 """477 Gather various systems information about a node478 """479 return self.com_net_prox_api_call('get', 'nodes/%s/report' % node_name)480 ###481 # Nodes/rrd482 ###483 ###484 # Nodes/rrddata485 ###486 ###487 # Nodes/spiceshell488 ###489 ###490 # Nodes/startall491 ###492 ###493 # Nodes/status494 ###495 ###496 # Nodes/stopall497 ###498 ###499 # Nodes/subscription500 ###501 ###502 # Nodes/syslog503 ###504 def com_net_prox_node_syslog(self, node_name):505 """506 Get a system log of node507 """508 return self.com_net_prox_api_call('get', 'nodes/%s/syslog' % node_name)509 ###510 # Nodes/time511 ###512 def com_net_prox_node_time(self, node_name):513 """514 Get a system log of node515 """516 return self.com_net_prox_api_call('get', 'nodes/%s/time' % node_name)517 ###518 # Nodes/version519 ###520 ###521 # Nodes/vncshell522 ###523 ###524 # Nodes/vncwebsocket525 ###526 ###527 # Nodes/vzdump528 ###529 ###530 # Pools531 ###532 def com_net_prox_pools(self):533 """534 Get the list of pools535 """536 return self.com_net_prox_api_call('get', 'pools')537 def com_net_prox_pool_create(self, post_data_json):538 """539 Create pool540 """541 return self.com_net_prox_api_call('post', 'pools', post_data_json)542 def com_net_prox_pool_update(self, pool_name, post_data_json):543 """544 Update pool545 """546 return self.com_net_prox_api_call('post', 'pools/%s' % pool_name, post_data_json)547 def com_net_prox_pool_delete(self, pool_name):548 """549 Delete pool550 """551 return self.com_net_prox_api_call('delete', 'pools/%s' % pool_name)552 ###553 # Storage554 ###555 def com_net_prox_storage(self):556 """557 Get the list of storage558 """559 return self.com_net_prox_api_call('get', 'storage')560 def com_net_prox_storage_create(self, post_data_json):561 """562 Create a new storage.563 """564 return self.com_net_prox_api_call('post', 'storage', post_data_json)565 def com_net_prox_storage_config(self, storage_name):566 """567 Update storage configuration.568 """569 return self.com_net_prox_api_call('get', 'storage/%s' % storage_name)570 def com_net_prox_storage_config_update(self, storage_name, post_data_json):571 """572 Read storage configuration.573 """574 return self.com_net_prox_api_call('get', 'storage/%s' % storage_name, post_data_json)575 def com_net_prox_storage_config_delete(self, storage_name):576 """577 Delete storage configuration.578 """579 return self.com_net_prox_api_call('delete', 'storage/%s' % storage_name)580 ###581 # Version582 ###583 def com_net_prox_api_version(self):584 """585 # grab version of api from node586 """587 return self.com_net_prox_api_call('get', 'version')588def com_net_prox_create_start_container(PROX_CONNECTION, JENKINS_BUILD_VIM, image_path):589 # build the container if doesn't already exist590 lxc_dict = {}591 for lxc_server in PROX_CONNECTION.com_net_prox_node_lxc_list('pve')['data']:592 lxc_dict[lxc_server['name']] = (lxc_server['vmid'], lxc_server['status'])593 print(('lxc: %s' % lxc_dict), flush=True)594 if JENKINS_BUILD_VIM in lxc_dict:595 # container exists, make sure it's running596 print(('status: %s' % str(lxc_dict[JENKINS_BUILD_VIM][1])), flush=True)597 # make sure it's started598 if lxc_dict[JENKINS_BUILD_VIM] != 'running':599 # start up the vm600 print(('start vim: %s' % lxc_dict[JENKINS_BUILD_VIM][0]), flush=True)601 print((PROX_CONNECTION.com_net_prox_node_lxc_start(602 'pve', lxc_dict[JENKINS_BUILD_VIM][0])), flush=True)603 else:604 # create the container605 print('create JENKINS_BUILD_VIM', flush=True)606 print((PROX_CONNECTION.com_net_prox_node_lxc_create('pve', JENKINS_BUILD_VIM, 4096,607 image_path,608 'ZFS_VM', 'alpine', 8, 'metaman')),609 flush=True)610 # keep an eye on task and see when its completed611 # while node.tasks(taskid).status()['status'] == 'running':612 # time.sleep(1)613# this is for VM!!! not container614# # check status of vm615# for vm_server in PROX_CONNECTION.com_net_prox_node_qemu_list('pve')['data']:616# if vm_server['name'] == JENKINS_BUILD_VIM:617# if vm_server['status'] == 'stopped':618# # start up the vm619# PROX_CONNECTION.com_net_prox_node_qemu_start('pve', vm_server['vmid'])620# time.sleep(60) # wait for box to boot...

Full Screen

Full Screen

api.py

Source:api.py Github

copy

Full Screen

1import pandas as pd2import numpy as np3import json4import graph_func5import base646from mzDataset import MzDataSet, DimRedDataSet7from flask import Flask, abort, request, make_response8from flask_cors import CORS9from PIL import Image10from io import BytesIO11from argparse import ArgumentParser12from os import listdir13from os.path import isdir14import re15parser = ArgumentParser()16parser.add_argument('-j', '--json', dest='json_name', required=True, nargs='?', help='name of the json file', metavar='filename', type=str)17args = parser.parse_args()18# Constants for folder structure19path_to_data = 'data/'20path_to_json = 'json/'21path_to_matrix = 'matrix/'22path_to_dataset = 'dataset/'23path_to_dimreduce = 'dimreduce/'24path_to_hist_images = 'histo-images/'25matrix_blueprint = 'similarity-matrix-{}.npy'26dimreduce_pattern_blueprint = '^dimreduce-{}.*\.h5'27dataset_blueprint = '{}.h5'28# dict to handle multiple datasets29datasets = {}30colorscales = {31 'Viridis': 'viridis',32 'Magma': 'magma',33 'Inferno': 'inferno',34 'Plasma': 'plasma',35 'PiYG': 'PiYG'36 }37aggregation_methods = {38 'mean': np.mean,39 'median': np.median,40 'min': np.min,41 'max': np.max,42}43with open(path_to_data + path_to_json + args.json_name, 'r') as file:44 json_content = json.load(file)['graphs']45 for graphKey in json_content.keys():46 datasets[json_content[graphKey]['dataset']] = ''47 firstDataset = json_content['graph0']48 graph_func.graph_initialisation(np.load((path_to_data + path_to_matrix + matrix_blueprint).format(firstDataset['dataset'])), firstDataset['threshold'])49 del json_content50 del firstDataset51for dataset_name in datasets.keys():52 '''53 Explanation for of datasets dict:54 dataset: mz cube for given dataset55 dimreduce: dimreduce dataset or None if there is no dimreduce dataset56 histo_images: name of files of histo images for given dataset or empty list if there are no images57 '''58 image_path = path_to_data + path_to_hist_images + dataset_name59 dimreduce_file_candidates = [file_name for file_name in listdir(path_to_data + path_to_dimreduce) if re.match(dimreduce_pattern_blueprint.format(dataset_name), file_name)]60 datasets[dataset_name] = {61 'dimreduce': None if len(dimreduce_file_candidates) is 0 else DimRedDataSet(pd.read_hdf(path_to_data + path_to_dimreduce + dimreduce_file_candidates[0]).droplevel('dataset')),62 'dataset': MzDataSet(pd.read_hdf((path_to_data + path_to_dataset + dataset_blueprint).format(dataset_name)).droplevel('dataset')),63 'histo_images': [image for image in listdir(image_path)] if isdir(image_path) else []64 }65 if datasets[dataset_name]['dimreduce'] is None:66 print('No dimensionality reduction image for dataset', dataset_name)67 if len(datasets[dataset_name]['histo_images']) is 0:68 if not isdir(image_path):69 print('No histo image folder for dataset', dataset_name)70 else:71 print('Histo image folder is empty for dataset', dataset_name)72# returns list of allowed merge methods for mz intensities73def aggregation_methods_names():74 return list(aggregation_methods.keys())75# returns names of all available datasets76def dataset_names():77 return list(datasets.keys())78# returns list of all mz_values79def mz_values(ds_name):80 return datasets[ds_name]['dataset'].getMzValues()81# generates json file for graph82def graph_data_all_datasets():83 with open(path_to_data + path_to_json + args.json_name, 'r') as file:84 try:85 data = json.load(file)86 except:87 data = {}88 return data89# rcube: reference cube, mcube: matching cube, ocube: overlap cube90def selection_match(polygon_ref, polygon_match, min_intensity, min_overlap, aggregation_method):91 # get thresholds for the minimum signal intensity that should be considered92 t = np.amax(polygon_ref) * min_intensity93 polygon_ref = (polygon_ref >= t) # binarization94 # get the maximum for each column and multiply by the min_intensity95 t = np.amax(polygon_match) * min_intensity96 polygon_match = (polygon_match >= t) # binarization97 # find amount of common entries in selected and node dataset98 overlap = polygon_ref * polygon_match99 #overlap = (polygon_ref * polygon_match) + (~polygon_ref * ~polygon_match)100 match_score = overlap.sum() / len(polygon_ref)101 return match_score >= min_overlap102def check_nodes_for_match(ds_name, node_data, selected_mzs, selected_points, aggregation_method, min_intensity, min_overlap, lasso_id):103 mzDataSet = datasets[ds_name]["dataset"]104 cube = mzDataSet.getCube()105 selected_points = list(zip(*selected_points))106 # 4: DimRed Image107 # 5: Modality Image108 # See constants.js in frontend109 if lasso_id in [4,5]:110 polygon_ref = mzDataSet.getBinaryImage(uInt8=True)[selected_points]111 else:112 polygon_ref = aggregation_method(cube[:,:,mzDataSet.getMzIndex(selected_mzs)], axis=2)[selected_points]113 node_names = []114 for node in node_data:115 polygon_match = aggregation_method(cube[:,:,mzDataSet.getMzIndex(node['mzs'])], axis=2)[selected_points]116 match = selection_match(polygon_ref, polygon_match, min_intensity, min_overlap, aggregation_method)117 if match:118 node_names.append(node['name'])119 return node_names120app = Flask(__name__)121CORS(app)122# get available merge methods if mz image of multiple images is queried123@app.route('/mz-merge-methods')124def aggregation_methods_action():125 return json.dumps(aggregation_methods_names())126# get all dataset names127@app.route('/datasets')128def datasets_action():129 return json.dumps(dataset_names())130@app.route('/datasets/<dataset_name>/images_info')131def datasets_available_images(dataset_name):132 if dataset_name not in dataset_names():133 return abort(400)134 return json.dumps({135 'dimreduce': datasets[dataset_name]['dimreduce'] is not None,136 'histo': datasets[dataset_name]['histo_images']137 })138# get mz values of dataset139@app.route('/datasets/<dataset_name>/mzvalues')140def datasets_mzvalues_action(dataset_name):141 if dataset_name not in dataset_names():142 return abort(400)143 return json.dumps(mz_values(dataset_name))144@app.route('/graph/centrality', methods=['GET'])145def centrality():146 return json.dumps(graph_func.betweenness_centrality())147@app.route('/graph/cluster_coefficient', methods=['GET'])148def clust_coeff():149 return json.dumps(graph_func.cluster_coefficient())150@app.route('/graph/eccentricity', methods=['GET'])151def eccentricity():152 return json.dumps(graph_func.eccentricity())153@app.route('/graph/degree', methods=['GET'])154def degree():155 return json.dumps(graph_func.degree())156@app.route('/graph/group_degree', methods=['GET'])157def group_degree():158 return json.dumps(graph_func.group_degree())159@app.route('/graph/avg_edge_weights', methods=['GET'])160def avg_edge_weights():161 return json.dumps(graph_func.average_weight_per_edge())162@app.route('/graph/between_group_degree', methods=['GET'])163def bet_group_degree():164 return json.dumps(graph_func.between_group_degree())165@app.route('/graph/within_group_degree', methods=['GET'])166def with_group_degree():167 return json.dumps(graph_func.within_group_degree())168@app.route('/graph/within_cluster_centrality', methods=['GET'])169def within_cluster_centrality():170 return json.dumps(graph_func.within_cluster_centrality())171@app.route('/graph/spanning_tree_degree', methods=['GET'])172def spanning_tree_degree():173 return json.dumps(graph_func.minimal_spanning_tree_degree())174@app.route('/graph/avg_neighbor_degree', methods=['GET'])175def avg_neighbor_degree():176 return json.dumps(graph_func.avg_neighbor_degree())177@app.route('/graph/update_cluster', methods=['PATCH'])178def update_graph_cluster():179 graph_func.update_graph(request.get_data().decode('utf-8'))180 return 'OK'181@app.route('/graph/change_graph', methods=['PATCH'])182def change_graph():183 data = json.loads(request.get_data().decode('utf-8'))184 dataset_name = data['name']185 threshold = data['threshold']186 if dataset_name not in dataset_names():187 return abort(400)188 graph_func.graph_initialisation(np.load((path_to_data + path_to_matrix + matrix_blueprint).format(dataset_name)), threshold)189 return json.dumps('OK')190@app.route('/datasets/<dataset_name>/imagedimensions', methods=['GET'])191def dataset_image_dimension(dataset_name):192 shape = datasets[dataset_name]['dataset'].getCube().shape193 return json.dumps({'height': shape[0], 'width': shape[1]})194# gets a list of visible nodes from the frontend195# get a list of selected points196# returns which nodes are similar197@app.route('/datasets/<dataset_name>/imagedata/match', methods=['POST'])198def datasets_imagedata_selection_match_nodes_action(dataset_name):199 if dataset_name not in dataset_names():200 return abort(400)201 try:202 post_data = request.get_data()203 post_data_json = json.loads(post_data.decode('utf-8'))204 post_data_visible_node_data = post_data_json['visibleNodes']205 post_data_selected_mzs = [float(i) for i in post_data_json['selectedMzs']]206 post_data_selected_points = post_data_json['selectedPoints']207 post_data_aggregation_method = post_data_json['method']208 post_data_min_intensity = float(post_data_json['minIntensity']) / 100209 post_data_min_overlap = float(post_data_json['minOverlap']) / 100210 lasso_id = int(post_data_json['lasso_id'])211 except Exception as e:212 print("Exception during Dimension Reduction Matching.")213 print(e)214 return abort(400)215 if post_data_aggregation_method not in aggregation_methods_names():216 return abort(400)217 ret = check_nodes_for_match(218 dataset_name,219 post_data_visible_node_data,220 post_data_selected_mzs,221 post_data_selected_points,222 aggregation_methods[post_data_aggregation_method],223 post_data_min_intensity,224 post_data_min_overlap,225 lasso_id226 )227 return json.dumps(ret)228# get mz image data for dataset and mz values229# specified merge method is passed via GET parameter230# mz values are passed via post request231@app.route('/datasets/<dataset_name>/mzimage', methods=['POST'])232def datasets_imagedata_multiple_mz_action(dataset_name):233 if dataset_name not in dataset_names():234 return abort(400)235 try:236 post_data = request.get_data()237 post_data_json = json.loads(post_data.decode('utf-8'))238 aggeregation_method = post_data_json['method']239 colorscale = post_data_json['colorscale']240 post_data_mz_values = [float(i) for i in post_data_json['mzValues']]241 except:242 return abort(400)243 if len(post_data_mz_values) == 0:244 return abort(400)245 img_io = BytesIO()246 Image.fromarray(247 datasets[dataset_name]['dataset'].getColorImage(248 post_data_mz_values,249 method=aggregation_methods[aggeregation_method],250 cmap=colorscales[colorscale]),251 mode='RGBA'252 ).save(img_io, 'PNG')253 img_io.seek(0)254 response = make_response('data:image/png;base64,' + base64.b64encode(img_io.getvalue()).decode('utf-8'), 200)255 response.mimetype = 'text/plain'256 return response257@app.route('/datasets/<dataset_name>/dimreduceimage', methods=['POST'])258def datasets_imagedata_dimreduce_image(dataset_name):259 if dataset_name not in dataset_names():260 return abort(400)261 if datasets[dataset_name]['dimreduce'] is None:262 return abort(400)263 try:264 post_data = request.get_data()265 post_data_json = json.loads(post_data.decode('utf-8'))266 try:267 aggeregation_method = post_data_json['method']268 mz_values = [float(i) for i in post_data_json['mzValues']]269 if len(mz_values) == 0:270 return abort(400)271 except KeyError:272 aggeregation_method = None273 mz_values = None274 try:275 alpha = float(post_data_json['alpha'])/100276 except KeyError:277 alpha = None278 except ValueError:279 abort(400)280 except:281 return abort(400)282 img_io = BytesIO()283 if aggeregation_method is None:284 Image.fromarray(285 datasets[dataset_name]['dimreduce'].getImage(),286 mode='RGBA'287 ).save(img_io, 'PNG')288 else:289 intensity = datasets[dataset_name]['dataset'].getGreyImage(mz_values, method=aggregation_methods[aggeregation_method])290 Image.fromarray(291 datasets[dataset_name]['dimreduce'].getRelativeImage(intensity) if alpha is None else datasets[dataset_name]['dimreduce'].getAbsoluteImage(intensity, alpha),292 mode='RGBA'293 ).save(img_io, 'PNG')294 img_io.seek(0)295 response = make_response('data:image/png;base64,' + base64.b64encode(img_io.getvalue()).decode('utf-8'), 200)296 response.mimetype = 'text/plain'297 return response298@app.route('/datasets/<dataset_name>/hist')299def hist_image(dataset_name):300 index = request.args.get('index')301 if index is None:302 index = 0303 else:304 try:305 index = int(index)306 except ValueError:307 return abort(400)308 if index >= len(datasets[dataset_name]['histo_images']):309 return abort(400)310 with open(path_to_data + path_to_hist_images + '{}/{}'.format(dataset_name, datasets[dataset_name]['histo_images'][index]), 'rb') as file_reader:311 response = make_response('data:image/png;base64,' + base64.b64encode(file_reader.read()).decode('utf-8'))312 response.mimetype = 'text/plain'313 return response314# get graph data for all datasets315@app.route('/datasets/graphdata')316def datasets_all_datasets_all_graphdata_action():317 return json.dumps(graph_data_all_datasets())318if __name__ == '__main__':...

Full Screen

Full Screen

tests.py

Source:tests.py Github

copy

Full Screen

1from reddit_parser import FileWriter2from shutil import copy23import json4import os5import requests6import unittest7class FileReplacer:8 reddit_test_file_name = "reddit-201901191955.txt"9 def replace_reddit_by_test_file(self):10 """Replaces reddit-file by test-file. Copies existing reddit-file to temporary while maintaining11 its original name, keeping it in the name of the temporary file.12 Removes reddit-file and copies test-file to the file with specified name.13 If initially reddit-file didn't exist the creation of temporary file is skipped.14 """15 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')16 if path_to_reddit_file:17 chars_count_to_skip = len('reddit-')18 reddit_f_datetime_start_ind = path_to_reddit_file.rfind('reddit-') + chars_count_to_skip19 reddit_f_datetime = path_to_reddit_file[reddit_f_datetime_start_ind:]20 copy2(path_to_reddit_file, f'temp-{reddit_f_datetime}')21 os.remove(path_to_reddit_file)22 path_to_test_file = FileWriter.define_path_to_file('test-')23 copy2(path_to_test_file, self.reddit_test_file_name)24 def restore_pre_test_state(self):25 """Restores pre-test state of the directory. If tested file with specified name exists, removes it.26 If temporary file exists, defines original name of reddit-file, copies this file to the file27 with found name and removes temporary file.28 """29 if os.path.exists(self.reddit_test_file_name):30 os.remove(self.reddit_test_file_name)31 path_to_temp_file = FileWriter.define_path_to_file('temp-')32 if path_to_temp_file:33 chars_count_to_skip = len('temp-')34 former_reddit_f_datetime_start_ind = path_to_temp_file.rfind('temp-') + chars_count_to_skip35 former_reddit_f_datetime = path_to_temp_file[former_reddit_f_datetime_start_ind:]36 copy2(path_to_temp_file, f'reddit-{former_reddit_f_datetime}')37 os.remove(path_to_temp_file)38class DirReorganizerMixin:39 def setUp(self):40 """Defines setUp behavior for unittests. Calls the function that replaces reddit-file by test-file"""41 FileReplacer().replace_reddit_by_test_file()42 def tearDown(self):43 """Defines setUp behavior for unittests. Calls the function that restores pre-test state of the directory"""44 FileReplacer().restore_pre_test_state()45class PostDataCollection:46 """Contains post dictionaries that will be used in unittests"""47 existent_post_dict = {'UNIQUE_ID': '48dde13e404611eb9360036bb7a2b36b',48 'post URL': 'https://www.reddit.com/r/blog/comments/k967mm/reddit_in_2020/',49 'username': 'reddit_irl', 'user karma': '197,182', 'user cake day': 'April 30, 2020',50 'post karma': '18,395', 'comment karma': '11,835', 'post date': '09.12.2020',51 'number of comments': '8.9k', 'number of votes': '168k', 'post category': 'blog'}52 nonexistent_post_dict = {'UNIQUE_ID': '00dde13e404611eb9360036bb7a2b36b',53 'post URL': 'https://www.reddit.com/r/blog/comments/k967mm/reddit_in_2020/',54 'username': 'reddit_irl', 'user karma': '197,182', 'user cake day': 'April 30, 2020',55 'post karma': '18,395', 'comment karma': '11,835', 'post date': '09.12.2020',56 'number of comments': '8.9k', 'number of votes': '168k', 'post category': 'blog'}57 incorrect_post_dict = {'UNIQUE_ID': '48dde13e404611eb9360036bb7a2b36b'}58class TestGET(DirReorganizerMixin, unittest.TestCase):59 def test_get_posts_success(self):60 print('testing get_posts success')61 req = requests.get("http://localhost:8087/posts/", timeout=5)62 expected_post_dict = PostDataCollection.existent_post_dict63 self.assertEqual((req.status_code, req.json()[1]), (200, expected_post_dict))64 def test_get_posts_no_file(self):65 print('testing get_posts with no file detected')66 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')67 os.remove(path_to_reddit_file)68 req = requests.get("http://localhost:8087/posts/", timeout=5)69 self.assertEqual((req.status_code, req.content), (404, b''))70 def test_get_posts_empty_file(self):71 print('testing get_posts with empty file')72 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')73 with open(path_to_reddit_file, 'w') as file:74 file.write('')75 req = requests.get("http://localhost:8087/posts/", timeout=5)76 self.assertEqual((req.status_code, req.content), (404, b''))77 def test_get_line_success(self):78 print('testing get_line success')79 req = requests.get("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)80 expected_post_dict = PostDataCollection.existent_post_dict81 self.assertEqual((req.status_code, req.json()), (200, expected_post_dict))82 def test_get_line_no_file(self):83 print('testing get_line with no file detected')84 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')85 os.remove(path_to_reddit_file)86 req = requests.get("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)87 self.assertEqual((req.status_code, req.content), (404, b''))88 def test_get_line_empty_file(self):89 print('testing get_posts with empty file')90 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')91 with open(path_to_reddit_file, 'w') as file:92 file.write('')93 req = requests.get("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)94 self.assertEqual((req.status_code, req.content), (404, b''))95 def test_get_line_not_found(self):96 print('testing get_line not found')97 req = requests.get("http://localhost:8087/posts/00dde13e404611eb9360036bb7a2b36b/", timeout=5)98 self.assertEqual((req.status_code, req.content), (404, b''))99 def test_get_url_not_valid(self):100 print('testing get_url is not valid')101 req = requests.get("http://localhost:8087/posts/000dde13e404611eb9360036bb7a2b36b/", timeout=5)102 self.assertEqual((req.status_code, req.content), (404, b''))103class TestPOST(DirReorganizerMixin, unittest.TestCase):104 def test_add_line_success(self):105 print('testing add_line success')106 post_data = PostDataCollection.nonexistent_post_dict107 post_data_json = json.dumps(post_data)108 req = requests.post("http://localhost:8087/posts/", data=post_data_json, timeout=5)109 self.assertEqual((req.status_code, req.json()), (201, {'UNIQUE_ID': 101}))110 def test_add_line_empty_file(self):111 print('testing add_line with empty file')112 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')113 with open(path_to_reddit_file, 'w') as file:114 file.write('')115 post_data = PostDataCollection.existent_post_dict116 post_data_json = json.dumps(post_data)117 req = requests.post("http://localhost:8087/posts/", data=post_data_json, timeout=5)118 self.assertEqual((req.status_code, req.json()), (201, {'UNIQUE_ID': 1}))119 def test_add_line_duplicate(self):120 print('testing add_line with duplicate')121 post_data = PostDataCollection.existent_post_dict122 post_data_json = json.dumps(post_data)123 req = requests.post("http://localhost:8087/posts/", data=post_data_json, timeout=5)124 self.assertEqual((req.status_code, req.content), (409, b''))125 def test_add_line_incorrect_post_data(self):126 print('testing add_line with incorrect data')127 post_data = PostDataCollection.incorrect_post_dict128 post_data_json = json.dumps(post_data)129 req = requests.post("http://localhost:8087/posts/", data=post_data_json, timeout=5)130 self.assertEqual((req.status_code, req.content), (404, b''))131 def test_post_url_not_valid(self):132 print('testing post_url is not valid')133 post_data = PostDataCollection.existent_post_dict134 post_data_json = json.dumps(post_data)135 req = requests.post("http://localhost:8087/postsinvalid/", data=post_data_json, timeout=5)136 self.assertEqual((req.status_code, req.content), (404, b''))137class TestDELETE(DirReorganizerMixin, unittest.TestCase):138 def test_del_line_success(self):139 print('testing del_line success')140 req = requests.delete("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)141 self.assertEqual(req.status_code, 200)142 def test_del_line_no_file(self):143 print('testing del_line with no file detected')144 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')145 os.remove(path_to_reddit_file)146 req = requests.delete("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)147 self.assertEqual(req.status_code, 404)148 def test_del_line_empty_file(self):149 print('testing del_line with empty file')150 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')151 with open(path_to_reddit_file, 'w') as file:152 file.write('')153 req = requests.delete("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/", timeout=5)154 self.assertEqual(req.status_code, 404)155 def test_del_line_not_found(self):156 print('testing del_line not found')157 req = requests.delete("http://localhost:8087/posts/00dde13e404611eb9360036bb7a2b36b/", timeout=5)158 self.assertEqual(req.status_code, 404)159 def test_delete_url_not_valid(self):160 print('testing delete_url is not valid')161 req = requests.delete("http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/del", timeout=5)162 self.assertEqual(req.status_code, 404)163class TestPUT(DirReorganizerMixin, unittest.TestCase):164 def test_change_line_success(self):165 print('testing change_line success')166 post_data = PostDataCollection.nonexistent_post_dict167 post_data_json = json.dumps(post_data)168 url = "http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/"169 req = requests.put(url, data=post_data_json, timeout=5)170 self.assertEqual(req.status_code, 200)171 def test_change_line_no_file(self):172 print('testing change_line with no file detected')173 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')174 os.remove(path_to_reddit_file)175 post_data = PostDataCollection.nonexistent_post_dict176 post_data_json = json.dumps(post_data)177 url = "http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/"178 req = requests.put(url, data=post_data_json, timeout=5)179 self.assertEqual(req.status_code, 404)180 def test_change_line_empty_file(self):181 print('testing change_line with empty file')182 path_to_reddit_file = FileWriter.define_path_to_file('reddit-')183 with open(path_to_reddit_file, 'w') as file:184 file.write('')185 post_data = PostDataCollection.existent_post_dict186 post_data_json = json.dumps(post_data)187 url = "http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/"188 req = requests.put(url, data=post_data_json, timeout=5)189 self.assertEqual(req.status_code, 404)190 def test_change_line_duplicate(self):191 print('testing change_line with duplicate')192 post_data = PostDataCollection.existent_post_dict193 post_data_json = json.dumps(post_data)194 url = "http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/"195 req = requests.put(url, data=post_data_json, timeout=5)196 self.assertEqual(req.status_code, 409)197 def test_change_line_incorrect_put_data(self):198 print('testing change_line with incorrect data')199 post_data = PostDataCollection.incorrect_post_dict200 post_data_json = json.dumps(post_data)201 url = "http://localhost:8087/posts/48dde13e404611eb9360036bb7a2b36b/"202 req = requests.put(url, data=post_data_json, timeout=5)203 self.assertEqual(req.status_code, 404)204 def test_change_line_not_found(self):205 print('testing change_line not found')206 post_data = PostDataCollection.existent_post_dict207 post_data_json = json.dumps(post_data)208 url = "http://localhost:8087/posts/00dde13e404611eb9360036bb7a2b36b/"209 req = requests.put(url, data=post_data_json, timeout=5)210 self.assertEqual(req.status_code, 404)211 def test_put_url_not_valid(self):212 print('testing put_url is not valid')213 post_data = PostDataCollection.existent_post_dict214 post_data_json = json.dumps(post_data)215 url = "http://localhost:8087/posts/bug48dde13e404611eb9360036bb7a2b36b/"216 req = requests.put(url, data=post_data_json, timeout=5)217 self.assertEqual(req.status_code, 404)218def test_outcome_file(filename):219 """Defines whether reddit-file is correct. If each line of this file contains exactly 11 values220 and the number of lines is equal to 100, returns True. If the file is incorrect, returns False.221 """222 if os.path.exists(filename):223 with open(filename) as file:224 data = file.readline()225 lines_count = 0226 while data:227 data = data.split(';')228 if not len(data) == 11:229 return230 data = file.readline()231 lines_count += 1232 return lines_count == 100233 return "File with the specified name doesn't exist"234if __name__ == "__main__":...

Full Screen

Full Screen

serverOld.py

Source:serverOld.py Github

copy

Full Screen

1#!/usr/bin/env python2# -*- coding: utf-8 -*-3import logging4import os5import json6import time7import uuid8from datetime import datetime9from common.Rqueue import Rqueue10from config import config11from common.Log import Logger12import redis13import flask14from werkzeug import secure_filename15# set the logger16logger = Logger('/var/log/MediaParse_server.log', 'debug', 'server')17class Server:18 def __init__(self):19 self.task_queue = Rqueue(name='parse_task')20 self.redis = redis.Redis(host='localhost', port=6379, db=0)21 def get_tasks_info_all(self):22 try:23 tasks_info_all = []24 tasks = self.redis.keys("parse_task:*")25 for task_one in tasks:26 task_info_one = None27 task_info_one = self.redis.hgetall(task_one)28 if task_info_one != None:29 tasks_info_all.append(task_info_one)30 logger.debug("get_tasks_info_all len:%s" %(len(tasks_info_all)))31 return tasks_info_all32 except Exception,ex:33 logger.error("get_tasks_info_all error:%s" %(ex))34 return []35 def get_system_info(self):36 try:37 info = self.redis.info()38 info = json.dumps(info)39 return info40 except Exception,ex:41 logger.error("get_system_info error:%s" %(ex))42 return "{}"43 def delete_task(self, task_id):44 try:45 logger.debug("delete_task:%s" %('parse_task:%s' %(task_id)))46 self.redis.delete('parse_task:%s' %(task_id))47 except Exception,ex:48 logger.error("delete_task error:%s" %(ex))49 def create_task(self, url):50 try:51 52 # generate id53 task_id = uuid.uuid1()54 logger.debug('generate task_id: %s' %(task_id))55 post_data_json = {}56 post_data_json['url'] = url57 post_data_json['id'] = str(task_id)58 post_data_json['status'] = 'created'59 post_data_json['create_time'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")60 # push to task queue61 self.task_queue.push(task_id)62 self.redis.hmset("parse_task:%s" %(task_id), post_data_json)63 64 # test65 data = self.redis.hgetall("parse_task:%s" %(task_id))66 logger.debug('task data : %s' %(json.dumps(data)))67 68 except Exception,ex:69 logger.error("create_task error:%s" %(ex))70 def get_task_stream_data(self, task_id, start_time, end_time):71 try:72 logger.debug("get_task_stream_data id=%s, start_time=%s, end_time=%s" %(task_id, start_time, end_time))73 # e.g: ZRANGEBYSCORE stream_data:cbd54cc6-e15f-11e5-b58c-00163e022b22 0 174 75 data = self.redis.zrangebyscore('stream_data:%s' %(task_id), start_time, end_time)76 logger.debug("get_task_stream_data len=%s" %(len(data)))77 return data78 except Exception,ex:79 logger.error("delete_task error:%s" %(ex))80 return []81# get the server82server = Server()83app = flask.Flask(__name__)84UPLOAD_FOLDER = './static/uploads'85# web86@app.route('/')87@app.route('/index')88def index():89 try:90 #return "Hello, this is Media Paser."91 tasks_info_all = []92 tasks_info_all = server.get_tasks_info_all()93 return flask.render_template('index.html', task_all = tasks_info_all)94 except Exception,ex:95 logger.error("index error:%s" %(ex))96 return "Hello, this is Media Paser."97@app.route('/analysis.html', methods=['GET'])98def redis_monitor_page():99 return flask.render_template('analysis.html')100# api101@app.route('/api/gettime')102def gettime():103 return "%s" %(datetime.now())104@app.route('/api/system_info')105def get_system_info():106 return server.get_system_info()107@app.route('/api/task/delete', methods=['GET','POST'])108def api_task_delete():109 try:110 response = {}111 # get the task id that need be deleted112 task_id = flask.request.form.get('task_id','null')113 if task_id == 'null':114 logger.warning("not get param task_id")115 response['result'] = 'error'116 else:117 logger.debug("get param task_id=%s" %(task_id))118 # to delete the task data119 server.delete_task(task_id)120 response['result'] = 'success'121 return json.dumps(response)122 except Exception,ex:123 logger.error("api_task_delete error:%s" %(ex))124 response['result'] = 'error'125 response['message'] = '%s' %(ex)126 return json.dumps(response)127@app.route('/api/task/create', methods=['GET','POST'])128def api_task_create():129 try:130 response = {}131 # get the task id that need be deleted132 url = flask.request.form.get('url','null')133 if url == 'null':134 logger.warning("not get param url")135 response['result'] = 'error'136 else:137 logger.debug("get param url=%s" %(url))138 # to delete the task data139 server.create_task(url)140 response['result'] = 'success'141 return json.dumps(response)142 except Exception,ex:143 logger.error("api_task_create error:%s" %(ex))144 response['result'] = 'error'145 response['message'] = '%s' %(ex)146 return json.dumps(response)147@app.route('/upload', methods=['GET','POST'])148def upload_file():149 try:150 if flask.request.method == 'GET':151 return flask.render_template('upload.html')152 elif flask.request.method == 'POST':153 f = flask.request.files['file']154 fname = secure_filename(f.filename)155 f.save(os.path.join(UPLOAD_FOLDER, fname))156 server.create_task("%s/%s" %(UPLOAD_FOLDER, fname))157 return 'upload success'158 except Exception,ex:159 logger.error("upload_file error:%s" %(ex))160 return 'error:%s' %(ex)161@app.route('/api/task/data', methods=['GET','POST'])162def api_task_data():163 try:164 response = {}165 start_time = flask.request.form.get('start_time','null')166 if start_time == 'null':167 logger.warning("not get param start_time")168 response['result'] = 'error'169 else:170 logger.debug("get param start_time=%s" %(start_time))171 end_time = flask.request.form.get('end_time','null')172 if end_time == 'null':173 logger.warning("not get param end_time")174 response['result'] = 'error'175 else:176 logger.debug("get param end_time=%s" %(end_time))177 task_id = flask.request.form.get('task_id','null')178 if task_id == 'null':179 logger.warning("not get param task_id")180 response['result'] = 'error'181 else:182 logger.debug("get param task_id=%s" %(task_id))183 data = server.get_task_stream_data(task_id, start_time, end_time)184 return json.dumps(data)185 except Exception,ex:186 logger.error("api_task_create error:%s" %(ex))187 response['result'] = 'error'188 response['message'] = '%s' %(ex)189 return json.dumps(response)190if __name__ == "__main__":...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful