Best Python code snippet using localstack_python
edge.py
Source:edge.py  
...259    proxy = start_proxy_server(port, use_ssl=True, update_listener=ProxyListenerEdge())260    if not asynchronous:261        proxy.join()262    return proxy263def can_use_sudo():264    try:265        run('echo | sudo -S echo', print_error=False)266        return True267    except Exception:268        return False269def ensure_can_use_sudo():270    if not is_root() and not can_use_sudo():271        print('Please enter your sudo password (required to configure local network):')272        run('sudo echo', stdin=True)273def start_edge(port=None, use_ssl=True, asynchronous=False):274    if not port:275        port = config.EDGE_PORT276    if config.EDGE_PORT_HTTP:277        do_start_edge(config.EDGE_PORT_HTTP, use_ssl=False, asynchronous=True)278    if port > 1024 or is_root():279        return do_start_edge(port, use_ssl, asynchronous=asynchronous)280    # process requires priviledged port but we're not root -> try running as sudo281    class Terminator(object):282        def stop(self, quiet=True):283            try:284                url = 'http%s://%s:%s' % ('s' if use_ssl else '', LOCALHOST, port)285                requests.verify_ssl = False286                requests.post(url, headers={HEADER_KILL_SIGNAL: 'kill'})287            except Exception:288                pass289    # make sure we can run sudo commands290    ensure_can_use_sudo()291    # register a signal handler to terminate the sudo process later on292    TMP_THREADS.append(Terminator())293    # start the process as sudo294    sudo_cmd = 'sudo '295    python_cmd = sys.executable296    cmd = '%sPYTHONPATH=.:%s %s %s %s' % (sudo_cmd, LOCALSTACK_ROOT_FOLDER, python_cmd, __file__, port)297    process = run(cmd, asynchronous=asynchronous)298    return process299if __name__ == '__main__':300    logging.basicConfig()...mte_core.py
Source:mte_core.py  
1#!/usr/bin/python32#3# [MTE] Maintenance Task Execution: Core Module4# @Description: 5#6# @Author: Arne Coomans7# @Contact: @arnecoomans on twitter8# @Version: 0.2.09# @Date: 01-01-202110# @Source: https://github.com/arnecoomans/maintenance/11#12#13# Import system modules14import os15import sys16import time17import argparse18import getpass19from datetime import datetime20# Add local shared script directory to import path21#  Local function library22sys.path.append(os.path.dirname(os.path.abspath(__file__)))23import mte_logging as mte_logging24import mte_config as mte_config25import mte_fs as mte_fs26import mte_task_dispatcher as mte_task_dispatcher27class Core:28  def __init__(self):29    # Internal data storage30    self.tasks = []31    self.storage = {}32    self.cache = {}33    self.storage = {34      # Application details35      'name': '[MTE] Maintenance Task Execution',36      'tagline': 'Centralized management of maintenance tasks.',37      'help_url': 'https://github.com/arnecoomans/maintenance',38      'version': '0.2.0',39      'author': 'Arne Coomans',40      # Runtime details41      'start_time': time.time(),42      'base_dir': os.path.split(os.path.dirname(os.path.abspath(__file__)))[0] + '/',43      'default_configuration': 'config/maintenance.yml',44      'runtime_user': getpass.getuser(),45      'runtime_user_id': str(os.getuid()),46      'runtime_group_id': str(os.getgid()),47      'has_root_privilage': self.has_root_privilage(),48      # System defaults49      'backup_target': '/backup/',50      'target_use_gzip': False,51      'can_use_sudo': False,52      # Logging53      'log.display_level': 3,54      'log.output_methods': ['screen', 'file'],55      'log.screen.display_width': 79,56      # Self-check57      # The following directories might not be present at runtime,58      # but might be expected sooner or later. 59      # All these directories are treated relative to 'base_dir'60      'required_directories': ['cache/', 'data/', 'docs/', 'tasks-available/', 'tasks-enabled']61    }62    # Actual processing63    #   Prepare logging64    self.log = mte_logging.Logger(self)65    #   Load configuration file66    self.config = mte_config.Config(self, self.storage['default_configuration'])67    self.log.update_config()68    #self.log.set_display_level()69    #   Parse command line arguments70    self.arguments = self.process_parsed_arguments( self.get_parsed_arguments() )71    #   Load Filesystem functions72    self.fs = mte_fs.Filesystem(self)73    #   Check if at least some configuration is loaded74    #   @todo75    #76    # Self-care77    self.verify_required_directories()78    #79    # Prepare Task Dispatcher80    self.dispatcher = mte_task_dispatcher.TaskDispatcher(self)81    self.config.get_runtime_arguments()82    self.log.flush()83  84  #85  #86  # Display Core information87  def __str__(self):88    return self.get_description()89  def get_version(self):90    return self.storage['version']91  def get_description(self):92    return self.storage['name'] + " v" + self.get_version() + ". "93  def get_tagline(self):94    return self.storage['tagline']95  def calculate_script_duration(self):96    return time.time() - self.storage['start_time']97  def get_runtime_duration(self):98    # Uses the start time defined in core.__init__ and current time to calculate running time.99    # Rounds output to 4 digits behind comma.100    return str(round(self.calculate_script_duration(), 4)) + " seconds"101  102  def get(self, key):103    if key in self.storage:104      return self.storage[key]105    else:106      return False107  108  # SUDO AND ROOT109  def has_root_privilage(self):110    if os.getuid() == 0:111      return True112    else:113      return False114  def get_sudo(self, task=''):115    if (self.config.get('can_use_sudo', task) and 116        self.config.get('run_as_root', task)117        and not self.has_root_privilage()):118      return "sudo "119    return ""120  121  def use_sudo(self, task=''):122    if self.has_root_privilage():123      return False124    elif self.config.get('can_use_sudo', task):125      return True126  127  def panic(self):128    # Clear log.129    sys.exit()130  def get_verified_directory(self, directory, task):131    if directory[-1:] == '/':132      directory = directory[0:-1]133    path = ''134    for part in directory.split('/'):135      if len(part) == 0:136        path += '/'137      else:138        path += part + '/'139        if not os.path.isdir(path):140          # directory does not exist.141          # If in root, use sudo142          if len(path.split('/')) <= 3:143            self.log.add("Need root to create directory [" + path + "] in root level.", 4)144            self.run_command('mkdir ' + path, task)145            self.log.add("Changing ownership of newly created directory to " + self.get('runtime_user'), 4)146            self.run_command('chown ' + self.get('runtime_user_id') + ':' + self.get('runtime_group_id') + ' ' + path, task)147          else:148            self.log.add('Creating directory [' + path + ']', 4)149            self.run_command('mkdir ' + path, task, False)150    # should create directories if required151    # should use sudo for top level directory152    if directory[:-1] != '/':153      directory += '/'154    return directory155  156  def verify_required_directories(self):157    for directory in self.get('required_directories'):158      self.get_verified_directory(self.get('base_dir') + directory, 'core')159  def get_target(self, task):160    # backup_target can be defined in task or core config161    # target_subdirectory can be defined in task or core config162    # task config overrules core config163    target = ''164    subdirectory = ''165    # backup_target166    if self.config.get('backup_target', task):167      target = self.config.get('backup_target', task)168    # target subdirectory169    if self.config.get('target_subdirectory', task):170      subdirectory = self.config.get('target_subdirectory', task)171    elif self.config.get('target_subdirectory'):172      # Only use subdirectory from global configuration if the backup_target173      #  is set in global configuration174      if not self.config.get('backup_target', task, True):175        subdirectory = self.config.get('target_subdirectory')176    # Cleanup177    if len(target) > 1 and target[-1:] != '/':178      target += '/'179    if subdirectory[:1] == '/':180      subdirectory = subdirectory[1:]181    if len(subdirectory) > 1 and subdirectory[-1:] != '/':182      subdirectory += '/'183    184    return self.get_verified_directory(target + subdirectory, task)185  186  def get_gzip(self, task):187    return self.use_gzip(task)188  def use_gzip(self, task):189    if self.config.get('target_use_gzip', task):190      return True191    else:192      return False193  def use_datetime(self, task):194    if self.config.get('target_use_datetime', task):195      return True196    else:197      return False198  def get_date_time(self, task):199    return datetime.now().strftime(self.config.get('date_time_format', task))200  # System command execution201  def run_command(self, command, task, sudo=True):202    if sudo:203      command = self.get_sudo(task) + command204    self.log.add('Executing os-level command: [' + command + "].")205    command = os.popen(command)206    return command.read().strip().split("\n")207  #208  #209  # Parse Command Line Arguments210  def get_parsed_arguments(self):211    # Set welcome text when displaying help text212    parser = argparse.ArgumentParser(description=self.get_description() + self.get_tagline(), 213                                     epilog="By " + self.storage['author'] + ". Have an issue or request? Use " + self.storage['help_url'])214    # List Arguments215    #   Task Selection216    parser.add_argument("-t", "--task",217                        help="* Select task to be executed.",218                        action="append",219                        required=True)220    #   Task Arguments221    parser.add_argument("-arg", "--argument",222                        help="Arguments passed to task",223                        )224    #   Cleanup225    parser.add_argument("-c", '--cleanup',226                        help="Run cleanup after task execution",227                        action="store_true")228    #   Target Selection229    #   Overrides target defined by configuration module.230    parser.add_argument("--target", 231                        help="Override backup target directory")232    # Configuration File233    #   Add a configuration file to be loaded that overrides previous configuration. 234    #   Default configuration is still processed.235    parser.add_argument("--config", 236                        help="Override config file (yml)",237                        action="append")238    # Logging239    #   Define the amount of data that should be processed by the logging module. Overrides240    #   the value in configuration.241    parser.add_argument("-l", "--logging", 242                        help="Override logging level, [none] 0 -- 5 [all]", 243                        type=int, 244                        choices=[0,1,2,3,4,5])245    # Parse arguments246    arguments = parser.parse_args()247    # Return parsed arguments248    return arguments249  def process_parsed_arguments(self, arguments):250    # Task selection251    #   Task arguments252    if arguments.task is not None:253      self.tasks = arguments.task254    # Target selection255    if arguments.target is not None:256      self.log.add("Command line arguments changed backup target from " + self.config.get('backup_target') + " to " + arguments.target + ".", 4)257      self.config.set('backup_target', arguments.target)258    # Configuration File selection259    if arguments.config is not None:260      for file in arguments.config:261        self.log.add("Command line arguments added " + file + " to configuration processer.", 4)262        self.config.get_contents_of_configuration_file(file)263    # Logging264    if arguments.logging is not None:265      if arguments.logging is not self.log.display_level:266        self.log.add('Command line arguments changed log display level from ' + str(self.log.display_level) + " to " + str(arguments.logging) + ".", 4)267        self.config.set('logging', arguments.logging)268        self.log.set_display_level(arguments.logging)...executable_update-all
Source:executable_update-all  
1#!/usr/bin/env python32import argparse3import shutil4import subprocess5import sys6parser = argparse.ArgumentParser(7  description="Install updates automagically.",8  formatter_class=argparse.RawDescriptionHelpFormatter)9parser.add_argument("--clean", "-c",10  action="store_true",11  help="clean cached and orphaned packages")12parser.add_argument("--dry-run", "-n",13  action="store_true",14  help="print actions without running anything")15parser.add_argument("--no-color",16  action="store_true",17  help="do not colorize output")18parser.add_argument("--sudo", "-s",19  action="store_true",20  help="run with sudo (where appropriate)")21parser.add_argument("--update", "-u",22  action="store_true",23  help="update package index")24parser.add_argument("--upgrade", "-U",25  action="store_true",26  help="upgrade packages")27parser.add_argument("--yes", "-y",28  action="store_true",29  help="run without asking for confirmation")30args = parser.parse_args()31class Color:32  RED     = "\033[31m"33  GREEN   = "\033[32m"34  YELLOW  = "\033[33m"35  BLUE    = "\033[34m"36  MAGENTA = "\033[35m"37  CYAN    = "\033[36m"38  NORMAL  = "\033[0m"39class PackageManager:40  """41  Represents a package manager and the commands needed to use it.42  """43  def __init__(self,44      name,45      required_commands,46      required_conditions,47      command_update,48      command_upgrade,49      command_clean,50      can_use_sudo,51      yes_flag):52    """53    Create a new package manager.54    # Arguments55    * name (str): The name of the package manager.56    * required_commands (list<string>): All of the commands that must exist57      on the system for the package manager.58    * required_conditions (list<list<string>>): Conditions that must be met59      for the package manager to be run.60    * command_update (list<string>): The command (with options) to run for61      an update.62    * command_upgrade (list<string>): The command (with options) to run for63      an upgrade.64    * command_clean (list<string>): The command (with options) to run for a65      clean.66    * can_use_sudo (bool): Indicates that the commands are allowed to be67      run with as root.68    * yes_flag (str): The flag to append to the commands when the `--yes`69      option is used.70    """71    self._name = name72    self._required_commands = required_commands73    self._required_conditions = required_conditions74    self._command_update = command_update75    self._command_upgrade = command_upgrade76    self._command_clean = command_clean77    self._can_use_sudo = can_use_sudo78    self._yes_flag = yes_flag79  def run(self, **kwargs):80    """81    Run all prescribed operations if the package manager is present on the82    system.83    # Arguments84    * kwargs: The same as the arguments to the program.85    """86    if self._exists():87      if "update" in kwargs and kwargs["update"]:88        self._run_command("update", self._command_update, **kwargs)89      if "upgrade" in kwargs and kwargs["upgrade"]:90        self._run_command("upgrade", self._command_upgrade, **kwargs)91      if "clean" in kwargs and kwargs["clean"]:92        self._run_command("clean", self._command_clean, **kwargs)93  def _exists(self):94    """95    Test whether the package mamanger exists on the system.96    """97    for command in self._required_commands:98      if shutil.which(command) is None:99        return False100    for condition in self._required_conditions:101      completed = subprocess.run(condition)102      if completed.returncode != 0:103        return False104    return True105  def _run_command(self, action_name, command, **kwargs):106    """107    Run a command. This exits the program if an error occurrs.108    # Arguments109    * action_name (str): The name of the action being run.110    * command (list<str>): The command to run.111    * kwargs: The same as the arguments to the program.112    """113    if command is None:114      return True115    if self._can_use_sudo and "sudo" in kwargs and kwargs["sudo"]:116      command = ["sudo"] + command117    if self._yes_flag and "yes" in kwargs and kwargs["yes"]:118      command = command + [self._yes_flag]119    command_str = " ".join(command)120    # Print the header121    header = "â©â©â© {} {} â©â©â© {}".format(self._name, action_name, command_str)122    if not ("no_color" in kwargs and kwargs["no_color"]):123      header = "{}{}{}".format(Color.MAGENTA, header, Color.NORMAL)124    print(header)125    # Run the command126    if not ("dry_run" in kwargs and kwargs["dry_run"]):127      completed = subprocess.run(command)128      if completed.returncode != 0:129        sys.exit(1)130# The available package managers. Keep sorted alphabetically by name131PACKAGE_MANAGERS = [132  PackageManager(133    name="apt",134    required_commands=["apt-get"],135    required_conditions=[],136    command_update=["apt-get", "update"],137    command_upgrade=["apt-get", "upgrade"],138    command_clean=["apt-get", "autoremove"],139    can_use_sudo=True,140    yes_flag="-y"141  ),142  PackageManager(143    name="brew",144    required_commands=["brew"],145    required_conditions=[],146    command_update=["brew", "update"],147    command_upgrade=["brew", "upgrade"],148    command_clean=["brew", "cleanup"],149    can_use_sudo=False,150    yes_flag=None151  ),152  PackageManager(153    name="cargo",154    required_commands=["cargo", "cargo-install-upgrade"],155    required_conditions=[],156    command_update=None,157    command_upgrade=["cargo", "install-upgrade"],158    command_clean=None,159    can_use_sudo=False,160    yes_flag=None161  ),162  PackageManager(163    name="chezmoi",164    required_commands=["chezmoi"],165    required_conditions=[],166    command_update=None,167    command_upgrade=["chezmoi", "upgrade"],168    command_clean=None,169    can_use_sudo=False,170    yes_flag=None171  ),172  PackageManager(173    name="pip",174    required_commands=["pipupgrade"],175    required_conditions=[],176    command_update=None,177    command_upgrade=["pipupgrade", "--latest"],178    command_clean=None,179    can_use_sudo=False,180    yes_flag="-y"181  ),182  PackageManager(183    name="rustup",184    required_commands=["rustup"],185    required_conditions=[],186    command_update=None,187    command_upgrade=["rustup", "update"],188    command_clean=None,189    can_use_sudo=False,190    yes_flag=None191  ),192  PackageManager(193    name="xbps",194    required_commands=["xbps-install", "xbps-remove"],195    required_conditions=[],196    command_update=["xbps-install", "-S"],197    command_upgrade=["xbps-install", "-u"],198    command_clean=["xbps-remove", "-oO"],199    can_use_sudo=True,200    yes_flag="-y"201  ),202]203for pm in PACKAGE_MANAGERS:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
