How to use can_use_sudo method in localstack

Best Python code snippet using localstack_python

edge.py

Source:edge.py Github

copy

Full Screen

...259 proxy = start_proxy_server(port, use_ssl=True, update_listener=ProxyListenerEdge())260 if not asynchronous:261 proxy.join()262 return proxy263def can_use_sudo():264 try:265 run('echo | sudo -S echo', print_error=False)266 return True267 except Exception:268 return False269def ensure_can_use_sudo():270 if not is_root() and not can_use_sudo():271 print('Please enter your sudo password (required to configure local network):')272 run('sudo echo', stdin=True)273def start_edge(port=None, use_ssl=True, asynchronous=False):274 if not port:275 port = config.EDGE_PORT276 if config.EDGE_PORT_HTTP:277 do_start_edge(config.EDGE_PORT_HTTP, use_ssl=False, asynchronous=True)278 if port > 1024 or is_root():279 return do_start_edge(port, use_ssl, asynchronous=asynchronous)280 # process requires priviledged port but we're not root -> try running as sudo281 class Terminator(object):282 def stop(self, quiet=True):283 try:284 url = 'http%s://%s:%s' % ('s' if use_ssl else '', LOCALHOST, port)285 requests.verify_ssl = False286 requests.post(url, headers={HEADER_KILL_SIGNAL: 'kill'})287 except Exception:288 pass289 # make sure we can run sudo commands290 ensure_can_use_sudo()291 # register a signal handler to terminate the sudo process later on292 TMP_THREADS.append(Terminator())293 # start the process as sudo294 sudo_cmd = 'sudo '295 python_cmd = sys.executable296 cmd = '%sPYTHONPATH=.:%s %s %s %s' % (sudo_cmd, LOCALSTACK_ROOT_FOLDER, python_cmd, __file__, port)297 process = run(cmd, asynchronous=asynchronous)298 return process299if __name__ == '__main__':300 logging.basicConfig()...

Full Screen

Full Screen

mte_core.py

Source:mte_core.py Github

copy

Full Screen

1#!/usr/bin/python32#3# [MTE] Maintenance Task Execution: Core Module4# @Description: 5#6# @Author: Arne Coomans7# @Contact: @arnecoomans on twitter8# @Version: 0.2.09# @Date: 01-01-202110# @Source: https://github.com/arnecoomans/maintenance/11#12#13# Import system modules14import os15import sys16import time17import argparse18import getpass19from datetime import datetime20# Add local shared script directory to import path21# Local function library22sys.path.append(os.path.dirname(os.path.abspath(__file__)))23import mte_logging as mte_logging24import mte_config as mte_config25import mte_fs as mte_fs26import mte_task_dispatcher as mte_task_dispatcher27class Core:28 def __init__(self):29 # Internal data storage30 self.tasks = []31 self.storage = {}32 self.cache = {}33 self.storage = {34 # Application details35 'name': '[MTE] Maintenance Task Execution',36 'tagline': 'Centralized management of maintenance tasks.',37 'help_url': 'https://github.com/arnecoomans/maintenance',38 'version': '0.2.0',39 'author': 'Arne Coomans',40 # Runtime details41 'start_time': time.time(),42 'base_dir': os.path.split(os.path.dirname(os.path.abspath(__file__)))[0] + '/',43 'default_configuration': 'config/maintenance.yml',44 'runtime_user': getpass.getuser(),45 'runtime_user_id': str(os.getuid()),46 'runtime_group_id': str(os.getgid()),47 'has_root_privilage': self.has_root_privilage(),48 # System defaults49 'backup_target': '/backup/',50 'target_use_gzip': False,51 'can_use_sudo': False,52 # Logging53 'log.display_level': 3,54 'log.output_methods': ['screen', 'file'],55 'log.screen.display_width': 79,56 # Self-check57 # The following directories might not be present at runtime,58 # but might be expected sooner or later. 59 # All these directories are treated relative to 'base_dir'60 'required_directories': ['cache/', 'data/', 'docs/', 'tasks-available/', 'tasks-enabled']61 }62 # Actual processing63 # Prepare logging64 self.log = mte_logging.Logger(self)65 # Load configuration file66 self.config = mte_config.Config(self, self.storage['default_configuration'])67 self.log.update_config()68 #self.log.set_display_level()69 # Parse command line arguments70 self.arguments = self.process_parsed_arguments( self.get_parsed_arguments() )71 # Load Filesystem functions72 self.fs = mte_fs.Filesystem(self)73 # Check if at least some configuration is loaded74 # @todo75 #76 # Self-care77 self.verify_required_directories()78 #79 # Prepare Task Dispatcher80 self.dispatcher = mte_task_dispatcher.TaskDispatcher(self)81 self.config.get_runtime_arguments()82 self.log.flush()83 84 #85 #86 # Display Core information87 def __str__(self):88 return self.get_description()89 def get_version(self):90 return self.storage['version']91 def get_description(self):92 return self.storage['name'] + " v" + self.get_version() + ". "93 def get_tagline(self):94 return self.storage['tagline']95 def calculate_script_duration(self):96 return time.time() - self.storage['start_time']97 def get_runtime_duration(self):98 # Uses the start time defined in core.__init__ and current time to calculate running time.99 # Rounds output to 4 digits behind comma.100 return str(round(self.calculate_script_duration(), 4)) + " seconds"101 102 def get(self, key):103 if key in self.storage:104 return self.storage[key]105 else:106 return False107 108 # SUDO AND ROOT109 def has_root_privilage(self):110 if os.getuid() == 0:111 return True112 else:113 return False114 def get_sudo(self, task=''):115 if (self.config.get('can_use_sudo', task) and 116 self.config.get('run_as_root', task)117 and not self.has_root_privilage()):118 return "sudo "119 return ""120 121 def use_sudo(self, task=''):122 if self.has_root_privilage():123 return False124 elif self.config.get('can_use_sudo', task):125 return True126 127 def panic(self):128 # Clear log.129 sys.exit()130 def get_verified_directory(self, directory, task):131 if directory[-1:] == '/':132 directory = directory[0:-1]133 path = ''134 for part in directory.split('/'):135 if len(part) == 0:136 path += '/'137 else:138 path += part + '/'139 if not os.path.isdir(path):140 # directory does not exist.141 # If in root, use sudo142 if len(path.split('/')) <= 3:143 self.log.add("Need root to create directory [" + path + "] in root level.", 4)144 self.run_command('mkdir ' + path, task)145 self.log.add("Changing ownership of newly created directory to " + self.get('runtime_user'), 4)146 self.run_command('chown ' + self.get('runtime_user_id') + ':' + self.get('runtime_group_id') + ' ' + path, task)147 else:148 self.log.add('Creating directory [' + path + ']', 4)149 self.run_command('mkdir ' + path, task, False)150 # should create directories if required151 # should use sudo for top level directory152 if directory[:-1] != '/':153 directory += '/'154 return directory155 156 def verify_required_directories(self):157 for directory in self.get('required_directories'):158 self.get_verified_directory(self.get('base_dir') + directory, 'core')159 def get_target(self, task):160 # backup_target can be defined in task or core config161 # target_subdirectory can be defined in task or core config162 # task config overrules core config163 target = ''164 subdirectory = ''165 # backup_target166 if self.config.get('backup_target', task):167 target = self.config.get('backup_target', task)168 # target subdirectory169 if self.config.get('target_subdirectory', task):170 subdirectory = self.config.get('target_subdirectory', task)171 elif self.config.get('target_subdirectory'):172 # Only use subdirectory from global configuration if the backup_target173 # is set in global configuration174 if not self.config.get('backup_target', task, True):175 subdirectory = self.config.get('target_subdirectory')176 # Cleanup177 if len(target) > 1 and target[-1:] != '/':178 target += '/'179 if subdirectory[:1] == '/':180 subdirectory = subdirectory[1:]181 if len(subdirectory) > 1 and subdirectory[-1:] != '/':182 subdirectory += '/'183 184 return self.get_verified_directory(target + subdirectory, task)185 186 def get_gzip(self, task):187 return self.use_gzip(task)188 def use_gzip(self, task):189 if self.config.get('target_use_gzip', task):190 return True191 else:192 return False193 def use_datetime(self, task):194 if self.config.get('target_use_datetime', task):195 return True196 else:197 return False198 def get_date_time(self, task):199 return datetime.now().strftime(self.config.get('date_time_format', task))200 # System command execution201 def run_command(self, command, task, sudo=True):202 if sudo:203 command = self.get_sudo(task) + command204 self.log.add('Executing os-level command: [' + command + "].")205 command = os.popen(command)206 return command.read().strip().split("\n")207 #208 #209 # Parse Command Line Arguments210 def get_parsed_arguments(self):211 # Set welcome text when displaying help text212 parser = argparse.ArgumentParser(description=self.get_description() + self.get_tagline(), 213 epilog="By " + self.storage['author'] + ". Have an issue or request? Use " + self.storage['help_url'])214 # List Arguments215 # Task Selection216 parser.add_argument("-t", "--task",217 help="* Select task to be executed.",218 action="append",219 required=True)220 # Task Arguments221 parser.add_argument("-arg", "--argument",222 help="Arguments passed to task",223 )224 # Cleanup225 parser.add_argument("-c", '--cleanup',226 help="Run cleanup after task execution",227 action="store_true")228 # Target Selection229 # Overrides target defined by configuration module.230 parser.add_argument("--target", 231 help="Override backup target directory")232 # Configuration File233 # Add a configuration file to be loaded that overrides previous configuration. 234 # Default configuration is still processed.235 parser.add_argument("--config", 236 help="Override config file (yml)",237 action="append")238 # Logging239 # Define the amount of data that should be processed by the logging module. Overrides240 # the value in configuration.241 parser.add_argument("-l", "--logging", 242 help="Override logging level, [none] 0 -- 5 [all]", 243 type=int, 244 choices=[0,1,2,3,4,5])245 # Parse arguments246 arguments = parser.parse_args()247 # Return parsed arguments248 return arguments249 def process_parsed_arguments(self, arguments):250 # Task selection251 # Task arguments252 if arguments.task is not None:253 self.tasks = arguments.task254 # Target selection255 if arguments.target is not None:256 self.log.add("Command line arguments changed backup target from " + self.config.get('backup_target') + " to " + arguments.target + ".", 4)257 self.config.set('backup_target', arguments.target)258 # Configuration File selection259 if arguments.config is not None:260 for file in arguments.config:261 self.log.add("Command line arguments added " + file + " to configuration processer.", 4)262 self.config.get_contents_of_configuration_file(file)263 # Logging264 if arguments.logging is not None:265 if arguments.logging is not self.log.display_level:266 self.log.add('Command line arguments changed log display level from ' + str(self.log.display_level) + " to " + str(arguments.logging) + ".", 4)267 self.config.set('logging', arguments.logging)268 self.log.set_display_level(arguments.logging)...

Full Screen

Full Screen

executable_update-all

Source:executable_update-all Github

copy

Full Screen

1#!/usr/bin/env python32import argparse3import shutil4import subprocess5import sys6parser = argparse.ArgumentParser(7 description="Install updates automagically.",8 formatter_class=argparse.RawDescriptionHelpFormatter)9parser.add_argument("--clean", "-c",10 action="store_true",11 help="clean cached and orphaned packages")12parser.add_argument("--dry-run", "-n",13 action="store_true",14 help="print actions without running anything")15parser.add_argument("--no-color",16 action="store_true",17 help="do not colorize output")18parser.add_argument("--sudo", "-s",19 action="store_true",20 help="run with sudo (where appropriate)")21parser.add_argument("--update", "-u",22 action="store_true",23 help="update package index")24parser.add_argument("--upgrade", "-U",25 action="store_true",26 help="upgrade packages")27parser.add_argument("--yes", "-y",28 action="store_true",29 help="run without asking for confirmation")30args = parser.parse_args()31class Color:32 RED = "\033[31m"33 GREEN = "\033[32m"34 YELLOW = "\033[33m"35 BLUE = "\033[34m"36 MAGENTA = "\033[35m"37 CYAN = "\033[36m"38 NORMAL = "\033[0m"39class PackageManager:40 """41 Represents a package manager and the commands needed to use it.42 """43 def __init__(self,44 name,45 required_commands,46 required_conditions,47 command_update,48 command_upgrade,49 command_clean,50 can_use_sudo,51 yes_flag):52 """53 Create a new package manager.54 # Arguments55 * name (str): The name of the package manager.56 * required_commands (list<string>): All of the commands that must exist57 on the system for the package manager.58 * required_conditions (list<list<string>>): Conditions that must be met59 for the package manager to be run.60 * command_update (list<string>): The command (with options) to run for61 an update.62 * command_upgrade (list<string>): The command (with options) to run for63 an upgrade.64 * command_clean (list<string>): The command (with options) to run for a65 clean.66 * can_use_sudo (bool): Indicates that the commands are allowed to be67 run with as root.68 * yes_flag (str): The flag to append to the commands when the `--yes`69 option is used.70 """71 self._name = name72 self._required_commands = required_commands73 self._required_conditions = required_conditions74 self._command_update = command_update75 self._command_upgrade = command_upgrade76 self._command_clean = command_clean77 self._can_use_sudo = can_use_sudo78 self._yes_flag = yes_flag79 def run(self, **kwargs):80 """81 Run all prescribed operations if the package manager is present on the82 system.83 # Arguments84 * kwargs: The same as the arguments to the program.85 """86 if self._exists():87 if "update" in kwargs and kwargs["update"]:88 self._run_command("update", self._command_update, **kwargs)89 if "upgrade" in kwargs and kwargs["upgrade"]:90 self._run_command("upgrade", self._command_upgrade, **kwargs)91 if "clean" in kwargs and kwargs["clean"]:92 self._run_command("clean", self._command_clean, **kwargs)93 def _exists(self):94 """95 Test whether the package mamanger exists on the system.96 """97 for command in self._required_commands:98 if shutil.which(command) is None:99 return False100 for condition in self._required_conditions:101 completed = subprocess.run(condition)102 if completed.returncode != 0:103 return False104 return True105 def _run_command(self, action_name, command, **kwargs):106 """107 Run a command. This exits the program if an error occurrs.108 # Arguments109 * action_name (str): The name of the action being run.110 * command (list<str>): The command to run.111 * kwargs: The same as the arguments to the program.112 """113 if command is None:114 return True115 if self._can_use_sudo and "sudo" in kwargs and kwargs["sudo"]:116 command = ["sudo"] + command117 if self._yes_flag and "yes" in kwargs and kwargs["yes"]:118 command = command + [self._yes_flag]119 command_str = " ".join(command)120 # Print the header121 header = "⟩⟩⟩ {} {} ⟩⟩⟩ {}".format(self._name, action_name, command_str)122 if not ("no_color" in kwargs and kwargs["no_color"]):123 header = "{}{}{}".format(Color.MAGENTA, header, Color.NORMAL)124 print(header)125 # Run the command126 if not ("dry_run" in kwargs and kwargs["dry_run"]):127 completed = subprocess.run(command)128 if completed.returncode != 0:129 sys.exit(1)130# The available package managers. Keep sorted alphabetically by name131PACKAGE_MANAGERS = [132 PackageManager(133 name="apt",134 required_commands=["apt-get"],135 required_conditions=[],136 command_update=["apt-get", "update"],137 command_upgrade=["apt-get", "upgrade"],138 command_clean=["apt-get", "autoremove"],139 can_use_sudo=True,140 yes_flag="-y"141 ),142 PackageManager(143 name="brew",144 required_commands=["brew"],145 required_conditions=[],146 command_update=["brew", "update"],147 command_upgrade=["brew", "upgrade"],148 command_clean=["brew", "cleanup"],149 can_use_sudo=False,150 yes_flag=None151 ),152 PackageManager(153 name="cargo",154 required_commands=["cargo", "cargo-install-upgrade"],155 required_conditions=[],156 command_update=None,157 command_upgrade=["cargo", "install-upgrade"],158 command_clean=None,159 can_use_sudo=False,160 yes_flag=None161 ),162 PackageManager(163 name="chezmoi",164 required_commands=["chezmoi"],165 required_conditions=[],166 command_update=None,167 command_upgrade=["chezmoi", "upgrade"],168 command_clean=None,169 can_use_sudo=False,170 yes_flag=None171 ),172 PackageManager(173 name="pip",174 required_commands=["pipupgrade"],175 required_conditions=[],176 command_update=None,177 command_upgrade=["pipupgrade", "--latest"],178 command_clean=None,179 can_use_sudo=False,180 yes_flag="-y"181 ),182 PackageManager(183 name="rustup",184 required_commands=["rustup"],185 required_conditions=[],186 command_update=None,187 command_upgrade=["rustup", "update"],188 command_clean=None,189 can_use_sudo=False,190 yes_flag=None191 ),192 PackageManager(193 name="xbps",194 required_commands=["xbps-install", "xbps-remove"],195 required_conditions=[],196 command_update=["xbps-install", "-S"],197 command_upgrade=["xbps-install", "-u"],198 command_clean=["xbps-remove", "-oO"],199 can_use_sudo=True,200 yes_flag="-y"201 ),202]203for pm in PACKAGE_MANAGERS:...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful