Best Python code snippet using autotest_python
do-svnsync.py
Source:do-svnsync.py  
...40        return "Can't find svnlook binary at {}".format(svnlook_bin)41    if not os.path.isdir(local_repo_directory):42        return "Can't find local repository directory at {}".format(local_repo_directory)43    return None44def process_output(file):45    output = file.read().split()46    for x in range(0, len(output)):47        output[x] = output[x].decode()48    return output49def append_to_list(original_list, list_to_append):50    for i in list_to_append:51        original_list.append(i)52def get_remote_dir_names():53    p = Popen([ssh_bin,54               '{}@{}'.format(55                   ssh_user_name,56                   server_name57               ),58               'find',59               '/var/lib/scm/repositories/svn/',60               '-maxdepth', '1',61               '-type', 'd',62               '-exec', 'basename {} \;'], stdout=PIPE, stderr=PIPE)63    output = []64    error = []65    while p.poll() is None:66        sleep(1)67        append_to_list(output, process_output(p.stdout))68        append_to_list(error, process_output(p.stderr))69    append_to_list(output, process_output(p.stdout))70    append_to_list(error, process_output(p.stderr))71    if not p.returncode == 0:72        print("Failed to list remote directories with return code {}".format(p.returncode), file=sys.stderr)73        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)74        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)75        return None76    elif __verbose:77        print(' '.join(output))78    return output79def sync_repo(path):80    # Sync existing mirror with new changes81    p = Popen([svnsync_bin,82               'sync',83               '--username',84               user_name,85               '--password',86               pass_word,87               '--config-option=servers:global:http-library={}'.format(svn_http_client),88               'file://{}'.format(path)], stdout=PIPE, stderr=PIPE)89    output = []90    error = []91    while p.poll() is None:92        sleep(1)93        append_to_list(output, process_output(p.stdout))94        append_to_list(error, process_output(p.stderr))95    append_to_list(output, process_output(p.stdout))96    append_to_list(error, process_output(p.stderr))97    if not p.returncode == 0:98        print("Svnsync sync failed with return code {}".format(p.returncode), file=sys.stderr)99        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)100        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)101        return False102    elif __verbose:103        print(' '.join(output))104    return True105def create_sync_repo(path, url):106    # Create new repository107    p = Popen([svnadmin_bin,108               'create',109               path], stdout=PIPE, stderr=PIPE)110    output = []111    error = []112    while p.poll() is None:113        sleep(1)114        append_to_list(output, process_output(p.stdout))115        append_to_list(error, process_output(p.stderr))116    append_to_list(output, process_output(p.stdout))117    append_to_list(error, process_output(p.stderr))118    if not p.returncode == 0:119        print("Svnadmin create failed with return code {}".format(p.returncode), file=sys.stderr)120        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)121        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)122        return False123    if __verbose:124        print(' '.join(output))125    revprop_path = None126    # Prep for svnsync127    try:128        # Make sure pre-revprop-change has content129        revprop_path = "{}/hooks/pre-revprop-change".format(path)130        with open(revprop_path, 'w') as out:131            out.write("#!/bin/sh")132        p = Popen(['chmod',133                   '755',134                   revprop_path], stdout=PIPE, stderr=PIPE)135        # Safely process the output136        output = []137        error = []138        while p.poll() is None:139            sleep(1)140            append_to_list(output, process_output(p.stdout))141            append_to_list(error, process_output(p.stderr))142        append_to_list(output, process_output(p.stdout))143        append_to_list(error, process_output(p.stderr))144        if not p.returncode == 0:145            print("chmod 755 failed with return code {}".format(p.returncode), file=sys.stderr)146            print("Process Output: {}".format(' '.join(output)), file=sys.stderr)147            print("Error Output: {}".format(' '.join(error)), file=sys.stderr)148            return False149        elif __verbose:150            print(' '.join(output))151    except IOError as err:152        print("An error occurred while writing to {} with error code {}: {}".format(153            revprop_path, err.errno, err.output), file=sys.stderr)154        return False155    # Now we're ready for svnsync init156    p = Popen([svnsync_bin,157               'init',158               '--username',159               user_name,160               '--password',161               pass_word,162               'file://{}'.format(path),163               url], stdout=PIPE, stderr=PIPE)164    # Safely process the output165    output = []166    error = []167    while p.poll() is None:168        sleep(1)169        append_to_list(output, process_output(p.stdout))170        append_to_list(error, process_output(p.stderr))171    append_to_list(output, process_output(p.stdout))172    append_to_list(error, process_output(p.stderr))173    if not (p.returncode == 0 or p.returncode == 1):174        print("Svnsync init failed with return code {}".format(p.returncode), file=sys.stderr)175        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)176        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)177        return False178    elif __verbose:179        print(p.stdout.readlines())180    p = Popen([svnlook_bin,181               'pg',182               '--revprop',183               '-r0',184               path,185               r'svn:sync-from-uuid'], stdout=PIPE, stderr=PIPE)186    # Safely process the output187    output = []188    error = []189    while p.poll() is None:190        sleep(1)191        append_to_list(output, process_output(p.stdout))192        append_to_list(error, process_output(p.stderr))193    append_to_list(output, process_output(p.stdout))194    append_to_list(error, process_output(p.stderr))195    if not p.returncode == 0:196        print("Svnlook pg failed with return code {}".format(p.returncode), file=sys.stderr)197        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)198        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)199        return False200    # Don't worry about displaying the output of svnlook pg, only contains a uuid if it succeeds201    # Speaking of which, set uuid to the output202    uuid = ''.join(output)203    p = Popen([svnadmin_bin,204               'setuuid',205               path,206               uuid], stdout=PIPE, stderr=PIPE)207    # Safely process the output208    output = []209    error = []210    while p.poll() is None:211        sleep(1)212        append_to_list(output, process_output(p.stdout))213        append_to_list(error, process_output(p.stderr))214    append_to_list(output, process_output(p.stdout))215    append_to_list(error, process_output(p.stderr))216    if not p.returncode == 0:217        print("Svnadmin setuuid failed with return code {}".format(p.returncode), file=sys.stderr)218        print("Process Output: {}".format(' '.join(output)), file=sys.stderr)219        print("Error Output: {}".format(' '.join(error)), file=sys.stderr)220        return False221    elif __verbose:222        print(' '.join(output))223    return True224# Main Function225def main():226    print("Enumerating directories from {}".format(server_name))227    names = get_remote_dir_names()228    if names is None:229        sys.exit(-2)...backend_kalman_filter.py
Source:backend_kalman_filter.py  
1import numpy as np2import streamlit as st3import plotly.express as px4import plotly.graph_objects as go5from pandas import DataFrame as df6class FrontEndUtils:7    def filter_plot(self, process_output):8        plot_df = df(process_output)9        output_fig = px.line(10            plot_df,11            x="Time",12            y=[13                "Error Estimation",14                "Angle Estimation",15                "Zero Order Sensor",16                "Gyroscope Angle",17            ],18        )19        gain_fig = px.line(plot_df, x="Iteration", y=["Kalman Gain 1", "Kalman Gain 2"])20        st.markdown("### Comparing Output")21        st.plotly_chart(output_fig, use_container_width=True)22        st.markdown("### Gain Update")23        st.plotly_chart(gain_fig, use_container_width=True)24    def plot_best_kalman(self, best_kalman):25        z = best_kalman["RMSE"]26        best_kalman_df = df({"n": best_kalman["n"], "e": best_kalman["e"]})27        fig = go.Figure(28            data=[go.Surface(z=z, x=best_kalman_df["n"], y=best_kalman_df["e"])]29        )30        fig.update_layout(xaxis_title="n", yaxis_title="e")31        st.plotly_chart(fig, use_container_width=True)32class BackEndUtils:33    def padding(self, x):34        log = np.log2(len(x))35        return np.pad(36            x, (0, int(2 ** ((log - log % 1) + 1) - len(x))), mode="constant"37        ).flatten()38    def FFT(self, x):39        if np.log2(len(x)) % 1 > 0:40            x = self.padding(x)41        x = np.asarray(x, dtype=float)42        N = x.shape[0]43        N_min = min(N, 2)44        # DFT on all length-N_min sub-problems at once45        n = np.arange(N_min)46        k = n[:, None]47        W = np.exp(-2j * np.pi * n * k / N_min)48        X = np.dot(W, x.reshape((N_min, -1)))49        # Recursive calculation all at once50        while X.shape[0] < N:51            X_even = X[:, : int(X.shape[1] / 2)]52            X_odd = X[:, int(X.shape[1] / 2) :]53            factor = np.exp(-1j * np.pi * np.arange(X.shape[0]) / X.shape[0])[:, None]54            factor.shape, factor55            X = np.vstack([X_even + factor * X_odd, X_even - factor * X_odd])56        return X.flatten()57    @st.cache58    def load_data(self):59        raw_data = np.loadtxt("trial1.dat")60        data = {}61        data["Time"] = raw_data[:, 0].flatten()62        data["Zero Order Sensor"] = raw_data[:, 1].flatten()63        data["Gyroscope"] = raw_data[:, 3].flatten()64        data["Tilt Sensor"] = raw_data[:, 4].flatten()65        return data66    def rmse(self, y_true, y_pred):67        return np.sqrt(np.mean((y_true - y_pred) ** 2))68class KalmanFilter(BackEndUtils):69    def __init__(self):70        self.data = self.load_data()71        self.dtm = self.data["Gyroscope"] - self.data["Tilt Sensor"]72    def fast_process(self, kalman_filter_state):73        A = np.array([[1, 1], [0, 1]])74        B = np.array([[1], [1]])75        n = kalman_filter_state["n"]76        e = kalman_filter_state["e"]77        post_error_covar = np.zeros((2, 2))78        kalman_gain = np.zeros((2, len(self.dtm)))79        d_theta_b_h_min = np.zeros((2, len(self.dtm) + 1))80        d_theta_b_h = np.zeros((2, len(self.dtm)))81        for i in np.arange(len(self.dtm)):82            # Priory Covar Calculation83            priori_error_covar = np.dot(A, np.dot(post_error_covar, A.T)) + B * B.T * e84            # Kalman Gain Calculation85            kalman_gain[:, i] = priori_error_covar[:, 0] / (86                priori_error_covar[0, 0] + n87            )88            # Kalman Filtering (Estimation)89            v = self.dtm[i] - d_theta_b_h_min[0, i]90            d_theta_b_h[:, i] = d_theta_b_h_min[:, i] + kalman_gain[:, i] * v91            # Post Covar Update92            post_copy = post_error_covar.copy()93            post_error_covar[0, :] = post_copy[0, :] * np.array([1, -kalman_gain[0, i]])94            post_error_covar[1, :] = np.dot(95                np.array([-kalman_gain[1, i], 1]), post_copy96            )97            # Prediction98            d_theta_b_h_min[:, i + 1] = np.dot(99                np.array([[1, 1], [0, 1]]), d_theta_b_h[:, i].reshape(-1, 1)100            ).flatten()101        return {102            "RMSE": self.rmse(103                self.data["Zero Order Sensor"],104                self.data["Gyroscope"] - d_theta_b_h[0, :],105            )106        }107    def process(self, kalman_filter_state):108        process_output = {}109        theta = self.data["Gyroscope"]110        # Input Difference111        dtm = self.data["Gyroscope"] - self.data["Tilt Sensor"]112        filter_output = self.kalman_filter(dtm, kalman_filter_state)113        process_output = filter_output114        process_output["Angle Estimation"] = theta - filter_output["Error Estimation"]115        process_output["Zero Order Sensor"] = self.data["Zero Order Sensor"]116        process_output["Gyroscope Angle"] = self.data["Gyroscope"]117        process_output["Tilt Sensor"] = self.data["Tilt Sensor"]118        process_output["RMSE"] = self.rmse(119            process_output["Zero Order Sensor"], process_output["Angle Estimation"]120        )121        return process_output122    def kalman_filter(self, input_filter, kalman_filter_state):123        # Initialize124        dtm = input_filter125        A = np.array([[1, 1], [0, 1]])126        B = np.array([[1], [1]])127        n = kalman_filter_state["n"]128        e = kalman_filter_state["e"]129        post_error_covar = np.zeros((2, 2))130        kalman_gain = np.zeros((2, len(dtm)))131        d_theta_b_h_min = np.zeros((2, len(dtm) + 1))132        d_theta_b_h = np.zeros((2, len(dtm)))133        for i in np.arange(len(dtm)):134            # Priory Covar Calculation135            priori_error_covar = np.dot(A, np.dot(post_error_covar, A.T)) + B * B.T * e136            # Kalman Gain Calculation137            kalman_gain[:, i] = priori_error_covar[:, 0] / (138                priori_error_covar[0, 0] + n139            )140            # Kalman Filtering (Estimation)141            v = dtm[i] - d_theta_b_h_min[0, i]142            d_theta_b_h[:, i] = d_theta_b_h_min[:, i] + kalman_gain[:, i] * v143            # Post Covar Update144            post_copy = post_error_covar.copy()145            post_error_covar[0, :] = post_copy[0, :] * np.array([1, -kalman_gain[0, i]])146            post_error_covar[1, :] = np.dot(147                np.array([-kalman_gain[1, i], 1]), post_copy148            )149            # Prediction150            d_theta_b_h_min[:, i + 1] = np.dot(151                np.array([[1, 1], [0, 1]]), d_theta_b_h[:, i].reshape(-1, 1)152            ).flatten()153        filter_output = {}154        filter_output["Kalman Gain 1"] = kalman_gain[0, :]155        filter_output["Kalman Gain 2"] = kalman_gain[1, :]156        filter_output["Error Estimation"] = d_theta_b_h[0, :]157        filter_output["Prediction"] = d_theta_b_h_min[0, :-1]158        filter_output["Time"] = self.data["Time"]159        filter_output["Iteration"] = np.arange(len(dtm))160        return filter_output161    def best_kalman_parameter_search(self, best_parameter_state):162        range_n_start = best_parameter_state["N Start"]163        range_n_end = best_parameter_state["N End"]164        range_e_start = best_parameter_state["E Start"]165        range_e_end = best_parameter_state["E End"]166        data_amount = best_parameter_state["Data Amount"]167        z_output = np.zeros((data_amount, data_amount))168        n = np.linspace(range_n_start, range_n_end, data_amount)169        e = np.linspace(range_e_start, range_e_end, data_amount)170        progress_bar = st.progress(0.0)171        amount = 0.0172        for i in range(data_amount):173            for j in range(data_amount):174                process_output = self.fast_process({"n": n[i], "e": e[j]})175                z_output[i, j] = process_output["RMSE"]176                amount += 1177                progress_bar.progress((amount) / (data_amount ** 2))178        # st.write(z_output)...exec_command.py
Source:exec_command.py  
...161            raise Exception(msg)162    return error_code163################################################################################164#165def print_process_output(166    process_output,167    suppress_success_output = False,168):169    if suppress_success_output:170        if "code" in process_output.keys():171            if process_output["code"] == 0:172                return173    if "stdout" in process_output.keys():174        # Use stderr to print stdout of the command.175        # If stdout results were captured,176        # they are not meant to be on stdout by default.177        logging.info(process_output["stdout"])178    if "stderr" in process_output.keys():179        logging.info(process_output["stderr"])...views.py
Source:views.py  
1from rest_framework.decorators import api_view2from rest_framework import status3from rest_framework.response import Response4from mvp_cd.deploy_app.models import (BuildInfo, ServerBuildInfo, ServerType)5import subprocess6from django.shortcuts import render_to_response7import json8from django.shortcuts import get_object_or_4049ALLOWED_BRANCHES = ['staging', 'production']10@api_view(['POST'])11def deploy_view(request):12    if request.data['payload']['outcome'] == 'success':13        if request.data['payload']['branch'] not in ALLOWED_BRANCHES:14            return Response(status=status.HTTP_204_NO_CONTENT)15        vcs_revision = request.data['payload']['vcs_revision']16        build_info_obj = BuildInfo(17            circleci_json=request.data,18            branch=request.data['payload']['branch'],19            commiter=request.data['payload']['author_name'],20            circleci_url=request.data['payload']['build_url'],21            vcs_revision=vcs_revision)22        build_info_obj.save()23        with open('deploy_config.json') as data_file:24            deploy_config = json.load(data_file)25            pem_path = deploy_config['pem_path']26            deployment_path = deploy_config['deployment_path']27        if request.data['payload']['branch'] == 'staging':28            for ip in deploy_config['staging_celery_ips']:29                deploy_celery_staging(30                    request, ip, pem_path, deployment_path,31                    build_info_obj, vcs_revision)32            for ip in deploy_config['staging_app_server_ips']:33                deploy_app_staging(34                    request, ip, pem_path, deployment_path,35                    build_info_obj, vcs_revision)36            return Response(status=status.HTTP_204_NO_CONTENT)37        elif request.data['payload']['branch'] == 'production':38            for ip in deploy_config['production_celery_ips']:39                deploy_celery_production(40                    request, ip, pem_path, deployment_path,41                    build_info_obj, vcs_revision)42            for ip in deploy_config['production_app_server_ips']:43                deploy_app_production(44                    request, ip, pem_path, deployment_path,45                    build_info_obj, vcs_revision)46            return Response(status=status.HTTP_204_NO_CONTENT)47    return Response(status=status.HTTP_400_BAD_REQUEST)48def build_list_view(request):49    build_info = BuildInfo.objects.all().order_by('-id')[:10]50    return render_to_response(51        'build_list.html', {'build_list': build_info})52def build_detail_view(request, id):53    build_info = get_object_or_404(BuildInfo, id=id)54    serverbuildinfo_list = build_info.serverbuildinfo_set.all().order_by('-id')55    return render_to_response(56        'build_info.html',57        {'build_info_item': build_info,58         'serverbuildinfo_list': serverbuildinfo_list})59def deploy_celery_staging(60        request, ip, pem_path, deployment_path,61        build_info_obj, vcs_revision):62    try:63        process_output = subprocess.check_output(64            ['bash', 'deploy_worker.sh', pem_path,65             ip, deployment_path, vcs_revision, 'mvpserver.settings.staging'],66            stdin=subprocess.PIPE)67        process_status(68            request, process_output, 0, ip,69            ServerType.WORKER, build_info_obj)70    except subprocess.CalledProcessError as e:71        process_status(72            request, e.output, 1, ip,73            ServerType.WORKER, build_info_obj)74def deploy_celery_production(75        request, ip, pem_path, deployment_path,76        build_info_obj, vcs_revision):77    try:78        process_output = subprocess.check_output(79            ['bash', 'deploy_worker.sh', pem_path, ip, deployment_path,80             vcs_revision, 'mvpserver.settings.production'],81            stdin=subprocess.PIPE)82        process_status(83            request, process_output, 0, ip,84            ServerType.WORKER, build_info_obj)85    except subprocess.CalledProcessError as e:86        process_status(87            request, e.output, 1, ip,88            ServerType.WORKER, build_info_obj)89def deploy_app_staging(90        request, ip, pem_path, deployment_path,91        build_info_obj, vcs_revision):92    try:93        process_output = subprocess.check_output(94            ['bash', 'deploy_app.sh', pem_path, ip, deployment_path,95             vcs_revision, 'mvpserver.settings.staging'],96            stdin=subprocess.PIPE)97        process_status(98            request, process_output, 0, ip,99            ServerType.APP_SERVER, build_info_obj)100    except subprocess.CalledProcessError as e:101        process_status(102            request, e.output, 1, ip,103            ServerType.APP_SERVER, build_info_obj)104def deploy_app_production(105        request, ip, pem_path, deployment_path,106        build_info_obj, vcs_revision):107    try:108        process_output = subprocess.check_output(109            ['bash', 'deploy_app.sh', pem_path, ip, deployment_path,110             vcs_revision, 'mvpserver.settings.production'],111            stdin=subprocess.PIPE)112        process_status(113            request, process_output, 0, ip,114            ServerType.APP_SERVER, build_info_obj)115    except subprocess.CalledProcessError as e:116        process_status(117            request, e.output, 1, ip,118            ServerType.APP_SERVER, build_info_obj)119def process_status(120        request, process_output, process_status,121        ip, server_type, build_info_obj):122    print(process_output)123    if process_status == 0:124        build_info_item = ServerBuildInfo(125            is_success=True,126            console_output=process_output,127            ip=ip,128            server_type=server_type,129            build_info=build_info_obj)130        build_info_obj.deployment_status = True131        build_info_obj.save()132        build_info_item.save()133    else:134        build_info_item = ServerBuildInfo(135            is_success=False,136            console_output=process_output,137            ip=ip,138            server_type=server_type,139            build_info=build_info_obj)140        build_info_obj.deployment_status = False141        build_info_obj.save()...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
