Best Python code snippet using tempest_python
distrolayer.py
Source:distrolayer.py  
1import subprocess2import sys3import invoke4from subprocess import run as Run5from invoke.exceptions import UnexpectedExit6class DistroAbstractionLayer:7    _knowndistros = ['ubuntu', 'debian', 'manjaro', 'arch', 'centos', 'raspbian', 'red hat', 'fedora']8    # Maps the known distributions to parent distributions so we dont need to handle9    # lots of package managers10    distro_map = {11        'ubuntu' : 'debian',12        'debian' : 'debian',13        'raspbian' : 'debian',14        'manjaro': 'arch',15        'arch': 'arch',16        'centos': 'red hat',17        'red hat': 'red hat',18        'fedora': 'red hat'19    }20    # Map to fix redhat being dumb21    _redhat_dumb_map = {22        'red' : 'red hat'23    }24    # Map of common commands, by distro25    _command_map = {26        'debian': {27            "install": "apt-get install $INSTALL$ -y",28            "upgrade": "apt-get upgrade -y",29            "update": "apt-get update -y",30            "hostname": "hostnamectl set-hostname $HOSTNAME$",31            "reboot": "reboot",32            "enable_service": "systemctl enable $ENABLE_SERVICE$",33        },34        'arch': {35            "install": "pacman -Sy $INSTALL$ --noconfirm",36            "update": "pacman -Syy",37            "upgrade": None,38            "hostname": "hostnamectl set-hostname $HOSTNAME$",39            "reboot": "reboot",40            "enable_service": "systemctl enable $ENABLE_SERVICE$",41        },42        'red hat': {43            "install": "yum install $INSTALL$ -y",44            "update": "yum update -y",45            "upgrade": None,46            "hostname": "hostnamectl set-hostname $HOSTNAME$",47            "reboot": "reboot",48            "enable_service": "systemctl enable $ENABLE_SERVICE$",49        }50    }51    distro_map_commands = {52        'lsb_release -ds': lambda command_output: command_output.rstrip().split(' ')[0],53        'cat /etc/redhat-release': lambda command_output: command_output.rstrip().split(' ')[0],54        'cat /etc/issue': lambda command_output: command_output.rstrip().split(' ')[0]55    }56    _custom_commands = {}57    def __init__(self, remote_connection=None, custom_command_map=None):58        """59            we expect if you pass a remote_connection, it is an already connected paramiko connection.60            custom_command_map needs to be a dictionary with the key being the command, and the value being a string61            command to run. if the command takes an input, it the input param needs to be formatted with the command name (upper case only), surrounded by $$. For example,62            If your command was "install", your dictionary should look as follows.63            dict(install="some command $INSTALL$).64            Currently, we only allow for single param custom commands. If you need more, you will have to build it yourself.65        """66        self._connection = remote_connection67        self.distro = self.__get_distro__()68        if custom_command_map:69            self._custom_commands = dict(custom_command_map)70    def __get_distro__(self):71        command = None72        if self._connection:73            command = lambda command_input: self._connection.run(command_input, hide=True).stdout74        else:75            command = lambda command_input: Run(command_input.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')76        success = False77        distro = None78        for key, function in self.distro_map_commands.items():79            if success:80                break81            try:82                distro = function(command(key))83                if distro.lower() in self._redhat_dumb_map.keys():84                    distro = self._redhat_dumb_map[distro.lower()]85                if distro:86                    success = True87            except FileNotFoundError:88                success = False89            except UnexpectedExit:90                success = False91        if not distro:92            raise Exception('Unknown Distribution Of Linux')93        if distro.lower() not in self._knowndistros:94            print(f"We aren't exactly sure how to handle {distro}. We will try out best")95        return distro96    def install(self, package):97        return self._create_command('install', param=package)98    def update(self):99        return self._create_command('update')100    def upgrade(self):101        return self._create_command('upgrade')102    103    def set_hostname(self, hostname):104        return self._create_command('hostname', param=hostname)105    def reboot(self):106        return self._create_command('reboot')107    def custom_command(self, command, param=None):108        return self._create_command(command, param)109    def get_groups_on_server(self):110        command = 'cat /etc/group'111        output = None112        if self._connection:113            output = self._connection.run(command, hide=True).stdout114        else:115            output = Run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')116        return [group.split(':')[0] for group in output.split('\n')]117    def encrypt_password(self, password):118        input_command = f'''python3 -c "from crypt import crypt; import re; print(crypt('{password}').replace('$',r'$'))"'''119        if self._connection:120            return self._connection.run(input_command, hide=True).stdout.rstrip()121        else:122            return lambda: Run(input_command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8').rstrip()123    def get_program_path(self, program):124        input_command = f'which {program}'125        if self._connection:126            command = lambda input_command: self._connection.run(input_command, hide=True).stdout127        else:128            command = lambda input_command: Run(input_command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')129        try:130            output = command(input_command)131            if output.startswith('which:'):132                output = ''133            else:134                output = output.rstrip()135        except UnexpectedExit as exception:136            result = exception.result137            if result.return_code == 1:138                output = result.stdout139        return output if output != '' else None140    def _create_command(self, command, param=None):141        d_map = self.distro_map[self.distro.lower()]142        if command not in self._custom_commands.keys() and command not in self._command_map[d_map].keys():143            raise NotImplementedError(f'{command} is not implemented for {self.distro}')144        _c = None145        if command in self._custom_commands.keys():146            _c = self._command_map[command]147        if not _c and command in self._command_map[d_map].keys():148            _c = self._command_map[d_map][command]149        if param:150            _c = _c.replace(f"${command.upper()}$", param)...arbotix_disable_all_servos.py
Source:arbotix_disable_all_servos.py  
1#!/usr/bin/env python2"""3    arbotix_disable_all_servos.py - Version 0.1 2012-03-244    5    Disable all servos6    7    Created for the Pi Robot Project: http://www.pirobot.org8    Copyright (c) 2013 Patrick Goebel.  All rights reserved.9    This program is free software; you can redistribute it and/or modify10    it under the terms of the GNU General Public License as published by11    the Free Software Foundation; either version 2 of the License, or12    (at your option) any later version.513    14    This program is distributed in the hope that it will be useful,15    but WITHOUT ANY WARRANTY; without even the implied warranty of16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the17    GNU General Public License for more details at:18    19    http://www.gnu.org/licenses/gpl.html20"""21import rospy, time22from arbotix_msgs.srv import Enable23class DisableServos():24    def __init__(self):25        rospy.init_node('disable_all_servos')26        27        # The list of joints is stored in the /arbotix/joints parameter28        self.joints = rospy.get_param('/arbotix/joints', '')29        # A list to hold the enable services                30        enable_services = list()31            32        for joint in self.joints:33            enable_service = '/' + joint + '/enable'34            rospy.wait_for_service(enable_service)  35            enable_services.append(rospy.ServiceProxy(enable_service, Enable))36        # Relax all servos to give them a rest.37        for enable in enable_services:38            enable(False)39        40if __name__=='__main__':41    try:42        DisableServos()43        rospy.loginfo("All servos disabled.")44    except rospy.ROSInterruptException:45        rospy.loginfo("Oops! Exception occurred while trying to disable servos...")...arbotix_enable_all_servos.py
Source:arbotix_enable_all_servos.py  
1#!/usr/bin/env python2"""3    arbotix_enable_all_servos.py - Version 0.1 2012-03-244    5    Enable all servos6    7    Created for the Pi Robot Project: http://www.pirobot.org8    Copyright (c) 2013 Patrick Goebel.  All rights reserved.9    This program is free software; you can redistribute it and/or modify10    it under the terms of the GNU General Public License as published by11    the Free Software Foundation; either version 2 of the License, or12    (at your option) any later version.513    14    This program is distributed in the hope that it will be useful,15    but WITHOUT ANY WARRANTY; without even the implied warranty of16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the17    GNU General Public License for more details at:18    19    http://www.gnu.org/licenses/gpl.html20"""21import rospy, time22from arbotix_msgs.srv import Enable23class EnableServos():24    def __init__(self):25        rospy.init_node('enable_all_servos')26        27        # The list of joints is stored in the /arbotix/joints parameter28        self.joints = rospy.get_param('/arbotix/joints', '')29        # A list to hold the enable services        30        enable_services = list()31            32        for joint in self.joints:33            enable_service = '/' + joint + '/enable'34            rospy.wait_for_service(enable_service)  35            enable_services.append(rospy.ServiceProxy(enable_service, Enable))36        # Relax all servos to give them a rest.37        for enable in enable_services:38            enable(True)39        40if __name__=='__main__':41    try:42        EnableServos()43        rospy.loginfo("All servos enabled.")44    except rospy.ROSInterruptException:45        rospy.loginfo("Oops! Exception occurred while trying to enable servos...")...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
