How to use process_output method in autotest

Best Python code snippet using autotest_python

do-svnsync.py

Source:do-svnsync.py Github

copy

Full Screen

...40 return "Can't find svnlook binary at {}".format(svnlook_bin)41 if not os.path.isdir(local_repo_directory):42 return "Can't find local repository directory at {}".format(local_repo_directory)43 return None44def process_output(file):45 output = file.read().split()46 for x in range(0, len(output)):47 output[x] = output[x].decode()48 return output49def append_to_list(original_list, list_to_append):50 for i in list_to_append:51 original_list.append(i)52def get_remote_dir_names():53 p = Popen([ssh_bin,54 '{}@{}'.format(55 ssh_user_name,56 server_name57 ),58 'find',59 '/var/lib/scm/repositories/svn/',60 '-maxdepth', '1',61 '-type', 'd',62 '-exec', 'basename {} \;'], stdout=PIPE, stderr=PIPE)63 output = []64 error = []65 while p.poll() is None:66 sleep(1)67 append_to_list(output, process_output(p.stdout))68 append_to_list(error, process_output(p.stderr))69 append_to_list(output, process_output(p.stdout))70 append_to_list(error, process_output(p.stderr))71 if not p.returncode == 0:72 print("Failed to list remote directories with return code {}".format(p.returncode), file=sys.stderr)73 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)74 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)75 return None76 elif __verbose:77 print(' '.join(output))78 return output79def sync_repo(path):80 # Sync existing mirror with new changes81 p = Popen([svnsync_bin,82 'sync',83 '--username',84 user_name,85 '--password',86 pass_word,87 '--config-option=servers:global:http-library={}'.format(svn_http_client),88 'file://{}'.format(path)], stdout=PIPE, stderr=PIPE)89 output = []90 error = []91 while p.poll() is None:92 sleep(1)93 append_to_list(output, process_output(p.stdout))94 append_to_list(error, process_output(p.stderr))95 append_to_list(output, process_output(p.stdout))96 append_to_list(error, process_output(p.stderr))97 if not p.returncode == 0:98 print("Svnsync sync failed with return code {}".format(p.returncode), file=sys.stderr)99 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)100 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)101 return False102 elif __verbose:103 print(' '.join(output))104 return True105def create_sync_repo(path, url):106 # Create new repository107 p = Popen([svnadmin_bin,108 'create',109 path], stdout=PIPE, stderr=PIPE)110 output = []111 error = []112 while p.poll() is None:113 sleep(1)114 append_to_list(output, process_output(p.stdout))115 append_to_list(error, process_output(p.stderr))116 append_to_list(output, process_output(p.stdout))117 append_to_list(error, process_output(p.stderr))118 if not p.returncode == 0:119 print("Svnadmin create failed with return code {}".format(p.returncode), file=sys.stderr)120 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)121 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)122 return False123 if __verbose:124 print(' '.join(output))125 revprop_path = None126 # Prep for svnsync127 try:128 # Make sure pre-revprop-change has content129 revprop_path = "{}/hooks/pre-revprop-change".format(path)130 with open(revprop_path, 'w') as out:131 out.write("#!/bin/sh")132 p = Popen(['chmod',133 '755',134 revprop_path], stdout=PIPE, stderr=PIPE)135 # Safely process the output136 output = []137 error = []138 while p.poll() is None:139 sleep(1)140 append_to_list(output, process_output(p.stdout))141 append_to_list(error, process_output(p.stderr))142 append_to_list(output, process_output(p.stdout))143 append_to_list(error, process_output(p.stderr))144 if not p.returncode == 0:145 print("chmod 755 failed with return code {}".format(p.returncode), file=sys.stderr)146 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)147 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)148 return False149 elif __verbose:150 print(' '.join(output))151 except IOError as err:152 print("An error occurred while writing to {} with error code {}: {}".format(153 revprop_path, err.errno, err.output), file=sys.stderr)154 return False155 # Now we're ready for svnsync init156 p = Popen([svnsync_bin,157 'init',158 '--username',159 user_name,160 '--password',161 pass_word,162 'file://{}'.format(path),163 url], stdout=PIPE, stderr=PIPE)164 # Safely process the output165 output = []166 error = []167 while p.poll() is None:168 sleep(1)169 append_to_list(output, process_output(p.stdout))170 append_to_list(error, process_output(p.stderr))171 append_to_list(output, process_output(p.stdout))172 append_to_list(error, process_output(p.stderr))173 if not (p.returncode == 0 or p.returncode == 1):174 print("Svnsync init failed with return code {}".format(p.returncode), file=sys.stderr)175 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)176 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)177 return False178 elif __verbose:179 print(p.stdout.readlines())180 p = Popen([svnlook_bin,181 'pg',182 '--revprop',183 '-r0',184 path,185 r'svn:sync-from-uuid'], stdout=PIPE, stderr=PIPE)186 # Safely process the output187 output = []188 error = []189 while p.poll() is None:190 sleep(1)191 append_to_list(output, process_output(p.stdout))192 append_to_list(error, process_output(p.stderr))193 append_to_list(output, process_output(p.stdout))194 append_to_list(error, process_output(p.stderr))195 if not p.returncode == 0:196 print("Svnlook pg failed with return code {}".format(p.returncode), file=sys.stderr)197 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)198 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)199 return False200 # Don't worry about displaying the output of svnlook pg, only contains a uuid if it succeeds201 # Speaking of which, set uuid to the output202 uuid = ''.join(output)203 p = Popen([svnadmin_bin,204 'setuuid',205 path,206 uuid], stdout=PIPE, stderr=PIPE)207 # Safely process the output208 output = []209 error = []210 while p.poll() is None:211 sleep(1)212 append_to_list(output, process_output(p.stdout))213 append_to_list(error, process_output(p.stderr))214 append_to_list(output, process_output(p.stdout))215 append_to_list(error, process_output(p.stderr))216 if not p.returncode == 0:217 print("Svnadmin setuuid failed with return code {}".format(p.returncode), file=sys.stderr)218 print("Process Output: {}".format(' '.join(output)), file=sys.stderr)219 print("Error Output: {}".format(' '.join(error)), file=sys.stderr)220 return False221 elif __verbose:222 print(' '.join(output))223 return True224# Main Function225def main():226 print("Enumerating directories from {}".format(server_name))227 names = get_remote_dir_names()228 if names is None:229 sys.exit(-2)...

Full Screen

Full Screen

backend_kalman_filter.py

Source:backend_kalman_filter.py Github

copy

Full Screen

1import numpy as np2import streamlit as st3import plotly.express as px4import plotly.graph_objects as go5from pandas import DataFrame as df6class FrontEndUtils:7 def filter_plot(self, process_output):8 plot_df = df(process_output)9 output_fig = px.line(10 plot_df,11 x="Time",12 y=[13 "Error Estimation",14 "Angle Estimation",15 "Zero Order Sensor",16 "Gyroscope Angle",17 ],18 )19 gain_fig = px.line(plot_df, x="Iteration", y=["Kalman Gain 1", "Kalman Gain 2"])20 st.markdown("### Comparing Output")21 st.plotly_chart(output_fig, use_container_width=True)22 st.markdown("### Gain Update")23 st.plotly_chart(gain_fig, use_container_width=True)24 def plot_best_kalman(self, best_kalman):25 z = best_kalman["RMSE"]26 best_kalman_df = df({"n": best_kalman["n"], "e": best_kalman["e"]})27 fig = go.Figure(28 data=[go.Surface(z=z, x=best_kalman_df["n"], y=best_kalman_df["e"])]29 )30 fig.update_layout(xaxis_title="n", yaxis_title="e")31 st.plotly_chart(fig, use_container_width=True)32class BackEndUtils:33 def padding(self, x):34 log = np.log2(len(x))35 return np.pad(36 x, (0, int(2 ** ((log - log % 1) + 1) - len(x))), mode="constant"37 ).flatten()38 def FFT(self, x):39 if np.log2(len(x)) % 1 > 0:40 x = self.padding(x)41 x = np.asarray(x, dtype=float)42 N = x.shape[0]43 N_min = min(N, 2)44 # DFT on all length-N_min sub-problems at once45 n = np.arange(N_min)46 k = n[:, None]47 W = np.exp(-2j * np.pi * n * k / N_min)48 X = np.dot(W, x.reshape((N_min, -1)))49 # Recursive calculation all at once50 while X.shape[0] < N:51 X_even = X[:, : int(X.shape[1] / 2)]52 X_odd = X[:, int(X.shape[1] / 2) :]53 factor = np.exp(-1j * np.pi * np.arange(X.shape[0]) / X.shape[0])[:, None]54 factor.shape, factor55 X = np.vstack([X_even + factor * X_odd, X_even - factor * X_odd])56 return X.flatten()57 @st.cache58 def load_data(self):59 raw_data = np.loadtxt("trial1.dat")60 data = {}61 data["Time"] = raw_data[:, 0].flatten()62 data["Zero Order Sensor"] = raw_data[:, 1].flatten()63 data["Gyroscope"] = raw_data[:, 3].flatten()64 data["Tilt Sensor"] = raw_data[:, 4].flatten()65 return data66 def rmse(self, y_true, y_pred):67 return np.sqrt(np.mean((y_true - y_pred) ** 2))68class KalmanFilter(BackEndUtils):69 def __init__(self):70 self.data = self.load_data()71 self.dtm = self.data["Gyroscope"] - self.data["Tilt Sensor"]72 def fast_process(self, kalman_filter_state):73 A = np.array([[1, 1], [0, 1]])74 B = np.array([[1], [1]])75 n = kalman_filter_state["n"]76 e = kalman_filter_state["e"]77 post_error_covar = np.zeros((2, 2))78 kalman_gain = np.zeros((2, len(self.dtm)))79 d_theta_b_h_min = np.zeros((2, len(self.dtm) + 1))80 d_theta_b_h = np.zeros((2, len(self.dtm)))81 for i in np.arange(len(self.dtm)):82 # Priory Covar Calculation83 priori_error_covar = np.dot(A, np.dot(post_error_covar, A.T)) + B * B.T * e84 # Kalman Gain Calculation85 kalman_gain[:, i] = priori_error_covar[:, 0] / (86 priori_error_covar[0, 0] + n87 )88 # Kalman Filtering (Estimation)89 v = self.dtm[i] - d_theta_b_h_min[0, i]90 d_theta_b_h[:, i] = d_theta_b_h_min[:, i] + kalman_gain[:, i] * v91 # Post Covar Update92 post_copy = post_error_covar.copy()93 post_error_covar[0, :] = post_copy[0, :] * np.array([1, -kalman_gain[0, i]])94 post_error_covar[1, :] = np.dot(95 np.array([-kalman_gain[1, i], 1]), post_copy96 )97 # Prediction98 d_theta_b_h_min[:, i + 1] = np.dot(99 np.array([[1, 1], [0, 1]]), d_theta_b_h[:, i].reshape(-1, 1)100 ).flatten()101 return {102 "RMSE": self.rmse(103 self.data["Zero Order Sensor"],104 self.data["Gyroscope"] - d_theta_b_h[0, :],105 )106 }107 def process(self, kalman_filter_state):108 process_output = {}109 theta = self.data["Gyroscope"]110 # Input Difference111 dtm = self.data["Gyroscope"] - self.data["Tilt Sensor"]112 filter_output = self.kalman_filter(dtm, kalman_filter_state)113 process_output = filter_output114 process_output["Angle Estimation"] = theta - filter_output["Error Estimation"]115 process_output["Zero Order Sensor"] = self.data["Zero Order Sensor"]116 process_output["Gyroscope Angle"] = self.data["Gyroscope"]117 process_output["Tilt Sensor"] = self.data["Tilt Sensor"]118 process_output["RMSE"] = self.rmse(119 process_output["Zero Order Sensor"], process_output["Angle Estimation"]120 )121 return process_output122 def kalman_filter(self, input_filter, kalman_filter_state):123 # Initialize124 dtm = input_filter125 A = np.array([[1, 1], [0, 1]])126 B = np.array([[1], [1]])127 n = kalman_filter_state["n"]128 e = kalman_filter_state["e"]129 post_error_covar = np.zeros((2, 2))130 kalman_gain = np.zeros((2, len(dtm)))131 d_theta_b_h_min = np.zeros((2, len(dtm) + 1))132 d_theta_b_h = np.zeros((2, len(dtm)))133 for i in np.arange(len(dtm)):134 # Priory Covar Calculation135 priori_error_covar = np.dot(A, np.dot(post_error_covar, A.T)) + B * B.T * e136 # Kalman Gain Calculation137 kalman_gain[:, i] = priori_error_covar[:, 0] / (138 priori_error_covar[0, 0] + n139 )140 # Kalman Filtering (Estimation)141 v = dtm[i] - d_theta_b_h_min[0, i]142 d_theta_b_h[:, i] = d_theta_b_h_min[:, i] + kalman_gain[:, i] * v143 # Post Covar Update144 post_copy = post_error_covar.copy()145 post_error_covar[0, :] = post_copy[0, :] * np.array([1, -kalman_gain[0, i]])146 post_error_covar[1, :] = np.dot(147 np.array([-kalman_gain[1, i], 1]), post_copy148 )149 # Prediction150 d_theta_b_h_min[:, i + 1] = np.dot(151 np.array([[1, 1], [0, 1]]), d_theta_b_h[:, i].reshape(-1, 1)152 ).flatten()153 filter_output = {}154 filter_output["Kalman Gain 1"] = kalman_gain[0, :]155 filter_output["Kalman Gain 2"] = kalman_gain[1, :]156 filter_output["Error Estimation"] = d_theta_b_h[0, :]157 filter_output["Prediction"] = d_theta_b_h_min[0, :-1]158 filter_output["Time"] = self.data["Time"]159 filter_output["Iteration"] = np.arange(len(dtm))160 return filter_output161 def best_kalman_parameter_search(self, best_parameter_state):162 range_n_start = best_parameter_state["N Start"]163 range_n_end = best_parameter_state["N End"]164 range_e_start = best_parameter_state["E Start"]165 range_e_end = best_parameter_state["E End"]166 data_amount = best_parameter_state["Data Amount"]167 z_output = np.zeros((data_amount, data_amount))168 n = np.linspace(range_n_start, range_n_end, data_amount)169 e = np.linspace(range_e_start, range_e_end, data_amount)170 progress_bar = st.progress(0.0)171 amount = 0.0172 for i in range(data_amount):173 for j in range(data_amount):174 process_output = self.fast_process({"n": n[i], "e": e[j]})175 z_output[i, j] = process_output["RMSE"]176 amount += 1177 progress_bar.progress((amount) / (data_amount ** 2))178 # st.write(z_output)...

Full Screen

Full Screen

exec_command.py

Source:exec_command.py Github

copy

Full Screen

...161 raise Exception(msg)162 return error_code163################################################################################164#165def print_process_output(166 process_output,167 suppress_success_output = False,168):169 if suppress_success_output:170 if "code" in process_output.keys():171 if process_output["code"] == 0:172 return173 if "stdout" in process_output.keys():174 # Use stderr to print stdout of the command.175 # If stdout results were captured,176 # they are not meant to be on stdout by default.177 logging.info(process_output["stdout"])178 if "stderr" in process_output.keys():179 logging.info(process_output["stderr"])...

Full Screen

Full Screen

views.py

Source:views.py Github

copy

Full Screen

1from rest_framework.decorators import api_view2from rest_framework import status3from rest_framework.response import Response4from mvp_cd.deploy_app.models import (BuildInfo, ServerBuildInfo, ServerType)5import subprocess6from django.shortcuts import render_to_response7import json8from django.shortcuts import get_object_or_4049ALLOWED_BRANCHES = ['staging', 'production']10@api_view(['POST'])11def deploy_view(request):12 if request.data['payload']['outcome'] == 'success':13 if request.data['payload']['branch'] not in ALLOWED_BRANCHES:14 return Response(status=status.HTTP_204_NO_CONTENT)15 vcs_revision = request.data['payload']['vcs_revision']16 build_info_obj = BuildInfo(17 circleci_json=request.data,18 branch=request.data['payload']['branch'],19 commiter=request.data['payload']['author_name'],20 circleci_url=request.data['payload']['build_url'],21 vcs_revision=vcs_revision)22 build_info_obj.save()23 with open('deploy_config.json') as data_file:24 deploy_config = json.load(data_file)25 pem_path = deploy_config['pem_path']26 deployment_path = deploy_config['deployment_path']27 if request.data['payload']['branch'] == 'staging':28 for ip in deploy_config['staging_celery_ips']:29 deploy_celery_staging(30 request, ip, pem_path, deployment_path,31 build_info_obj, vcs_revision)32 for ip in deploy_config['staging_app_server_ips']:33 deploy_app_staging(34 request, ip, pem_path, deployment_path,35 build_info_obj, vcs_revision)36 return Response(status=status.HTTP_204_NO_CONTENT)37 elif request.data['payload']['branch'] == 'production':38 for ip in deploy_config['production_celery_ips']:39 deploy_celery_production(40 request, ip, pem_path, deployment_path,41 build_info_obj, vcs_revision)42 for ip in deploy_config['production_app_server_ips']:43 deploy_app_production(44 request, ip, pem_path, deployment_path,45 build_info_obj, vcs_revision)46 return Response(status=status.HTTP_204_NO_CONTENT)47 return Response(status=status.HTTP_400_BAD_REQUEST)48def build_list_view(request):49 build_info = BuildInfo.objects.all().order_by('-id')[:10]50 return render_to_response(51 'build_list.html', {'build_list': build_info})52def build_detail_view(request, id):53 build_info = get_object_or_404(BuildInfo, id=id)54 serverbuildinfo_list = build_info.serverbuildinfo_set.all().order_by('-id')55 return render_to_response(56 'build_info.html',57 {'build_info_item': build_info,58 'serverbuildinfo_list': serverbuildinfo_list})59def deploy_celery_staging(60 request, ip, pem_path, deployment_path,61 build_info_obj, vcs_revision):62 try:63 process_output = subprocess.check_output(64 ['bash', 'deploy_worker.sh', pem_path,65 ip, deployment_path, vcs_revision, 'mvpserver.settings.staging'],66 stdin=subprocess.PIPE)67 process_status(68 request, process_output, 0, ip,69 ServerType.WORKER, build_info_obj)70 except subprocess.CalledProcessError as e:71 process_status(72 request, e.output, 1, ip,73 ServerType.WORKER, build_info_obj)74def deploy_celery_production(75 request, ip, pem_path, deployment_path,76 build_info_obj, vcs_revision):77 try:78 process_output = subprocess.check_output(79 ['bash', 'deploy_worker.sh', pem_path, ip, deployment_path,80 vcs_revision, 'mvpserver.settings.production'],81 stdin=subprocess.PIPE)82 process_status(83 request, process_output, 0, ip,84 ServerType.WORKER, build_info_obj)85 except subprocess.CalledProcessError as e:86 process_status(87 request, e.output, 1, ip,88 ServerType.WORKER, build_info_obj)89def deploy_app_staging(90 request, ip, pem_path, deployment_path,91 build_info_obj, vcs_revision):92 try:93 process_output = subprocess.check_output(94 ['bash', 'deploy_app.sh', pem_path, ip, deployment_path,95 vcs_revision, 'mvpserver.settings.staging'],96 stdin=subprocess.PIPE)97 process_status(98 request, process_output, 0, ip,99 ServerType.APP_SERVER, build_info_obj)100 except subprocess.CalledProcessError as e:101 process_status(102 request, e.output, 1, ip,103 ServerType.APP_SERVER, build_info_obj)104def deploy_app_production(105 request, ip, pem_path, deployment_path,106 build_info_obj, vcs_revision):107 try:108 process_output = subprocess.check_output(109 ['bash', 'deploy_app.sh', pem_path, ip, deployment_path,110 vcs_revision, 'mvpserver.settings.production'],111 stdin=subprocess.PIPE)112 process_status(113 request, process_output, 0, ip,114 ServerType.APP_SERVER, build_info_obj)115 except subprocess.CalledProcessError as e:116 process_status(117 request, e.output, 1, ip,118 ServerType.APP_SERVER, build_info_obj)119def process_status(120 request, process_output, process_status,121 ip, server_type, build_info_obj):122 print(process_output)123 if process_status == 0:124 build_info_item = ServerBuildInfo(125 is_success=True,126 console_output=process_output,127 ip=ip,128 server_type=server_type,129 build_info=build_info_obj)130 build_info_obj.deployment_status = True131 build_info_obj.save()132 build_info_item.save()133 else:134 build_info_item = ServerBuildInfo(135 is_success=False,136 console_output=process_output,137 ip=ip,138 server_type=server_type,139 build_info=build_info_obj)140 build_info_obj.deployment_status = False141 build_info_obj.save()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run autotest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful