How to use is_interrupted method in Slash

Best Python code snippet using slash

utils.py

Source:utils.py Github

copy

Full Screen

1import logging2import os3import random4import re5import socket6import sys7import time8import unicodedata9from io import BufferedReader10from pathlib import Path11from shutil import which12from subprocess import PIPE, Popen, check_call13from typing import IO, Any, Dict, List, Tuple, Union14from urllib.parse import urlparse15import click16import verboselogs17# from libnmap.objects.report import NmapReport18# from libnmap.parser import NmapParser19# from libnmap.process import NmapProcess20from progress.spinner import PixelSpinner21from progressbar import ETA, Bar, Counter, ProgressBar, Timer22from stringcolor import bold23from .config import BASE_PATH, ENV_MODE, LOGGING_LEVEL, PROJECT_NAME24from .defaultLogBanner import log_runBanner25from .locater import Locator26# ------------------------------------------------------------------------------27#28#29#30# ------------------------------------------------------------------------------31class Context:32 progress: Dict[int, ProgressBar] = {}33 def __init__(self):34 logging.log(logging.DEBUG, 'init context...')35 self.service: Any = None36 self.project: str = PROJECT_NAME37 self.base_path: str = BASE_PATH38 self.home_path: Path = Path.home()39 self.use_sudo: List[str] = []40 self.utils: Union[Utils, None] = None41 self.disable_split_project: Union[bool, None] = None42 self.disable_split_host: Union[bool, None] = None43 self.print_only_mode: Union[bool, None] = None44 self.terminal_read_mode: bool = False45 self.logging_verbose: Union[int, None] = None46 self.logging_level: Union[str, None] = None47pass_context = click.make_pass_decorator(Context, ensure=True)48# ------------------------------------------------------------------------------49#50#51#52# ------------------------------------------------------------------------------53class Utils:54 runner_init_count = 155 runner_time_check_running = 156 runner_text_it_is_running = [57 "...yep, still running",58 "...no stress, process still running",59 "...process is aaalive ;)",60 "...we current still processing, please wait ... loooong time :P",61 "...still running bro"62 ]63 # --------------------------------------------------------------------------64 #65 #66 #67 # --------------------------------------------------------------------------68 def __init__(self, ctx: Context):69 self.update(ctx, is_init=True)70 def update(self, ctx: Context, is_init: bold = False):71 self.ctx = ctx72 if not is_init and LOGGING_LEVEL == logging.getLevelName(logging.DEBUG):73 print()74 print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')75 logging.log(logging.DEBUG, f'LOGGING-LEVEL : {bold(LOGGING_LEVEL)}')76 logging.log(logging.DEBUG, f'DISABLED SPLIT PROJECT : {bold(self.ctx.disable_split_project)}')77 logging.log(logging.DEBUG, f'DISABLED SPLIT HOST : {bold(self.ctx.disable_split_host)}')78 logging.log(logging.DEBUG, f'PRINT ONLY MODE : {bold(self.ctx.print_only_mode)}')79 logging.log(logging.DEBUG, f'PROJECT-PATH : {bold(self.create_service_path(None))}{bold("/")}')80 logging.log(logging.DEBUG, f'ENV-MODE : {bold(ENV_MODE)}')81 print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')82 print()83 # --------------------------------------------------------------------------84 #85 # path | folder | file - helper86 #87 # --------------------------------------------------------------------------88 def create_folder(self, path: str) -> None:89 Path(path).mkdir(parents=True, exist_ok=True, mode=0o700)90 def get_user_path(self) -> str:91 return str(Path.home())92 def create_service_folder(self, name: str, host: Union[str, None] = None) -> str:93 path = f'{self.create_service_path(host)}/{name}'94 self.create_folder(path)95 logging.log(logging.DEBUG, f'new folder created:: {path}')96 return path97 def create_service_path(self, host: Union[str, None] = None):98 if not self.ctx.disable_split_host and host is not None:99 host = self.slugify(host)100 host = '' if host is None else f'/{host}'101 else:102 host = ''103 if not self.ctx.disable_split_project:104 project = '' if self.ctx.project is None else f'/{self.ctx.project}'105 else:106 project = ''107 if self.ctx.base_path[-1] == '/':108 self.ctx.base_path = self.ctx.base_path[:-1]109 return f'{self.ctx.base_path}{project}{host}'110 # --------------------------------------------------------------------------111 #112 # command exec - helper113 #114 # --------------------------------------------------------------------------115 def run_command_endless(self, command_list=[]) -> None:116 sub_p: Union[Popen[bytes], None] = None117 is_running = True118 try:119 index_to_check = 0120 index_to_check = 1 if command_list[index_to_check] == 'sudo' else index_to_check121 # if sudo is in command, first check into root122 if index_to_check == 1:123 if self.prompt_sudo() != 0:124 sys.exit(4)125 logging.log(verboselogs.NOTICE, ' '.join(command_list))126 if self.is_tool(command_list[index_to_check]):127 with Popen(command_list) as sub_p:128 while is_running:129 time.sleep(600)130 else:131 logging.log(logging.ERROR, f'the command "{command_list[index_to_check]}", did not exist')132 except (SystemExit, KeyboardInterrupt) as k:133 logging.log(logging.WARNING, f'process interupted! ({k})')134 except Exception as e:135 logging.log(logging.CRITICAL, e, exc_info=True)136 is_running = False137 try:138 if sub_p is not None:139 sub_p.terminate()140 except Exception:141 pass142 try:143 if sub_p is not None:144 while sub_p.poll() is None:145 time.sleep(1)146 except Exception:147 pass148 def run_command(self, command_list: List[str] = [], input_value: Union[str, None] = None149 ) -> Tuple[Union[str, None], Union[str, None], bool]:150 sub_std_res: Union[str, None] = None151 sub_err_res: Union[str, None] = None152 is_interrupted: bool = False153 if not self.ctx.print_only_mode:154 try:155 sub_std: Union[bytes, str, None] = None156 sub_err: Union[bytes, str, None] = None157 index_to_check = 0158 index_to_check = 1 if command_list[index_to_check] == 'sudo' else index_to_check159 # if sudo is in command, first check into root160 if index_to_check == 1:161 if self.prompt_sudo() != 0:162 sys.exit(4)163 if self.is_tool(command_list[index_to_check]):164 if input_value is None:165 # , start_new_session=True166 with Popen(command_list, stdout=PIPE, stderr=PIPE) as sub_p:167 sub_std, sub_err, is_interrupted = self.subprocess_handler(168 sub_p=sub_p, input_value=input_value, command=command_list[index_to_check])169 else:170 with Popen(command_list, stdout=PIPE, stderr=PIPE, stdin=PIPE) as sub_p:171 sub_std, sub_err, is_interrupted = self.subprocess_handler(172 sub_p=sub_p, input_value=input_value, command=command_list[index_to_check])173 else:174 logging.log(logging.ERROR, f'the command "{command_list[index_to_check]}", did not exist')175 sub_err = b"MISSING_COMMAND"176 if sub_std is not None and isinstance(sub_std, bytes):177 sub_std_res = sub_std.decode()178 if sub_err is not None and isinstance(sub_err, bytes) and len(sub_err) > 0:179 sub_err_res = sub_err.decode()180 logging.log(logging.ERROR, sub_err.split(b'\n'))181 except KeyboardInterrupt as k:182 logging.log(logging.WARNING, f'process interupted! ({k})')183 is_interrupted = True184 except Exception as e:185 logging.log(logging.CRITICAL, e, exc_info=True)186 return (sub_std_res, sub_err_res, is_interrupted)187 def subprocess_handler(self, sub_p: Popen[Any], input_value: Union[str, None] = None,188 command: Union[str, None] = None189 ) -> Tuple[Union[bytes, None], Union[bytes, None], bool]:190 sub_std: Union[bytes, None] = None191 sub_err: Union[bytes, None] = None192 sub_p_std: Union[IO[bytes], bytes, None] = None193 sub_p_err: Union[IO[bytes], None] = None194 is_interrupted: bool = False195 try:196 if sub_p.stdin is not None and input_value is not None:197 sub_p.stdin.write(input_value.encode())198 sub_p.stdin.close()199 if sub_p.poll() is None:200 if not self.ctx.terminal_read_mode or (command is not None and command == 'tee'):201 time.sleep(self.runner_time_check_running)202 with PixelSpinner('Processing... ') as spinner:203 while sub_p.poll() is None:204 if self.runner_init_count % 6 == 0:205 spinner.message = f'{random.choice(self.runner_text_it_is_running)} '206 self.runner_init_count = 1207 spinner.next()208 self.runner_init_count += 1209 time.sleep(self.runner_time_check_running)210 if sub_p.stdout is not None:211 sub_p_std = sub_p.stdout212 else:213 logging.log(214 logging.INFO, 'you run in terminal read mode, some function can maybe not print anything and you will see longer no response, please wait ...')215 for stdout_line in sub_p.stdout:216 if stdout_line is not None and len(stdout_line) > 0:217 if sub_p_std is None:218 sub_p_std = stdout_line219 else:220 sub_p_std += stdout_line221 logging.log(logging.INFO, stdout_line.decode().replace('\n', ''))222 if sub_p.stderr is not None:223 sub_p_err = sub_p.stderr224 except (SystemExit, KeyboardInterrupt):225 is_interrupted = True226 if not self.ctx.terminal_read_mode:227 if sub_p.stdout is not None:228 sub_p_std = sub_p.stdout229 if sub_p.stderr is not None:230 sub_p_err = sub_p.stderr231 try:232 sub_p.kill()233 except Exception:234 pass235 if isinstance(sub_p_std, bytes):236 sub_std = sub_p_std237 if isinstance(sub_p_std, BufferedReader):238 sub_std = sub_p_std.read()239 if isinstance(sub_p_err, BufferedReader):240 sub_err = sub_p_err.read()241 return (sub_std, sub_err, is_interrupted)242 def is_tool(self, name: str) -> bool:243 '''244 Check whether `name` is on PATH and marked as executable.245 '''246 return which(name) is not None247 def run_command_output_loop(self, msg: str, cmds: List[List[str]] = [], output: bool = True) -> Union[str, None]:248 '''249 run command from list in a loop, and also optional pipe them into each other250 default exec function is "run_command" with different251 '''252 cmd_result: Union[str, None] = None253 is_interrupted: bool = False254 try:255 log_runBanner(msg)256 if len(cmds) <= 1:257 output = False258 for cmd in cmds:259 if not is_interrupted or cmd[0] == 'tee':260 logging.log(verboselogs.NOTICE, ' '.join(cmd))261 if output:262 cmd_result, std_err, is_interrupted = self.run_command(263 command_list=cmd, input_value=cmd_result)264 else:265 cmd_result, std_err, is_interrupted = self.run_command(command_list=cmd)266 if std_err is not None and std_err == "MISSING_COMMAND":267 cmd_result = None268 break269 if cmd_result is not None:270 if len(cmd_result) > 0:271 logging.log(verboselogs.SPAM, f'output is:\n{cmd_result}')272 else:273 cmd_result = None274 if output:275 logging.log(logging.WARNING, 'no result available to pipe')276 break277 elif output:278 logging.log(logging.WARNING, 'no result available to pipe')279 break280 except KeyboardInterrupt as k:281 logging.log(logging.WARNING, f'process interupted! ({k})')282 raise KeyboardInterrupt283 except Exception as e:284 logging.log(logging.CRITICAL, e, exc_info=True)285 if is_interrupted and cmd_result is None:286 raise KeyboardInterrupt287 return cmd_result288 # --------------------------------------------------------------------------289 #290 #291 #292 # --------------------------------------------------------------------------293 def group(self, flat: List[Any], size: int) -> List[Any]:294 '''295 group list a flat list into a matrix of "size"296 '''297 return [flat[i:i+size] for i in range(0, len(flat), size)]298 def normalize_caseless(self, text: str) -> str:299 '''300 lowercase a string, for any unicode301 '''302 return unicodedata.normalize('NFKD', text.casefold())303 def slugify(self, value: Union[str, None], allow_unicode: bool = False) -> Union[str, None]:304 '''305 https://github.com/django/django/blob/main/django/utils/text.py306 '''307 value = str(value)308 if allow_unicode:309 value = unicodedata.normalize('NFKC', value)310 else:311 value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')312 value = re.sub(r'[^\w\s-]', '', value.lower())313 return re.sub(r'[-\s]+', '-', value).strip('-_')314 # --------------------------------------------------------------------------315 #316 #317 #318 # --------------------------------------------------------------------------319 def in_sudo_mode(self) -> None:320 '''321 If the user doesn't run the program with super user privileges, don't allow them to continue.322 '''323 if 'SUDO_UID' not in os.environ.keys():324 logging.log(logging.ERROR, 'Try running this program with sudo.')325 sys.exit(1)326 def prompt_sudo(self) -> int:327 try:328 if os.geteuid() != 0:329 msg = "hay [sudo] password for %u: "330 return check_call(f"sudo -v -p '{msg}'", shell=True)331 except Exception:332 pass333 return -1334 def get_ip_address(self) -> Union[str, None]:335 IP = None336 st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)337 try:338 st.connect(('10.255.255.255', 1))339 IP = st.getsockname()[0]340 except Exception:341 IP = '127.0.0.1'342 finally:343 st.close()344 return IP345 def uri_validator(self, url: str) -> Union[str, None]:346 try:347 if url.endswith('/'):348 url = url[:-1]349 result = urlparse(url)350 if all([result.scheme, result.netloc]):351 return url352 except Exception:353 pass354 return None355 def geo(self) -> Union[str, None]:356 '''357 This is a geo test example358 '''359 try:360 return Locator(ctx=self.ctx).check_database()361 except Exception as e:362 logging.log(logging.CRITICAL, e, exc_info=True)363 return None364 # --------------------------------------------------------------------------365 #366 #367 #368 # --------------------------------------------------------------------------369 def progress(self, id: int, value: int, description: str = "Processing", maxval: int = 100) -> None:370 try:371 # if self.ctx.progress.get(id) is None:372 # self.ctx.progress[id] = tqdm(total=maxval, desc=description, colour="#000", leave=False)373 # if self.ctx.progress.get(id) is not None:374 # bar = self.ctx.progress.get(id)375 # bar.update(value)376 if self.ctx.progress.get(id) is None:377 self.ctx.progress[id] = ProgressBar(378 widgets=[description, ' [', Timer(), '] ', Bar(marker='O'), ' [', Counter(379 format='%(value)02d/%(max_value)d'), ']', ' (', ETA(), ') '],380 maxval=maxval).start()381 bar_p: ProgressBar = self.ctx.progress.get(id)382 bar_p.update(value=value)383 if value >= maxval:384 print()385 except Exception as e:386 logging.log(logging.CRITICAL, e, exc_info=True)387 # def nmap_process(self, msg: str, host: str, options: List[str], safe_mode: bool = True) -> NmapReport:388 # try:389 # log_runBanner(msg)390 # logging.log(verboselogs.NOTICE, f'nmap {" ".join(host)} {" ".join(options)}')391 # if not self.ctx.print_only_mode:392 # nmap_proc: NmapProcess = NmapProcess(targets=host, options=' '.join(options), safe_mode=safe_mode)393 # nmap_proc.run_background()394 # while nmap_proc.is_running():395 # self.progress(100, float(nmap_proc.progress))396 # time.sleep(0.01)397 # self.progress(100, 100)398 # if nmap_proc.stderr is not None:399 # if "QUITTING" in nmap_proc.stderr:400 # logging.log(logging.CRITICAL, nmap_proc.stderr)401 # return None402 # logging.log(logging.WARNING, nmap_proc.stderr)403 # return NmapParser.parse(nmap_proc.stdout)404 # except Exception as e:405 # logging.log(logging.CRITICAL, e, exc_info=True)406 # --------------------------------------------------------------------------407 #408 #409 #410 # --------------------------------------------------------------------------411 def define_option_list(self, options: str, default_options: List[Any] = [],412 options_append: bool = False, default_split_by: str = ',') -> List[Any]:413 '''414 defines a list of option to use in a callable service415 to define how to create this list416 by:417 - create it from a default only418 - create it from params only419 - create it by combine default and params420 '''421 try:422 result: List[Any] = []423 # add options from params424 if options is not None and not options_append:425 result = [options] # .split(default_split_by)426 # add options from params to existing options427 elif options is not None and options_append:428 result = default_options + [options] # .split(default_split_by)429 # use existing options430 else:431 result = default_options432 return result433 except Exception as e:434 logging.log(logging.CRITICAL, e, exc_info=True)...

Full Screen

Full Screen

trolley_tuner_base.py

Source:trolley_tuner_base.py Github

copy

Full Screen

1import logging2from abc import ABC3from eva.lib.parameters import Parameters4from eva.robots.trolley_base import TrolleyBase5from eva.tuner.tuner_base import TunerBase6logger = logging.getLogger()7class TrolleyTunerBase(TrolleyBase, TunerBase, ABC):8 MAX_STEPS = 79 def __init__(self):10 super(TrolleyTunerBase, self).__init__()11 self._params = Parameters({'is_interrupted': False})12 def _maximize_params(self, get_params, start_value=0.0, end_value=1.0, step=0):13 current_value = (start_value + end_value) * 0.514 if step >= self.MAX_STEPS:15 return start_value16 self._restore_and_update_params(get_params(current_value))17 self._run()18 if self._is_system_stable():19 return self._maximize_params(get_params, current_value, end_value, step + 1)20 else:21 return self._maximize_params(get_params, start_value, current_value, step + 1)22 def _process(self):23 self._params.backup()24 def _restore_and_update_params(self, params):25 self._params.restore()26 self._params.update(params)27 def _prepare(self):28 # Set manually to the initial position29 self._wait_button_press()30 super(TrolleyTunerBase, self)._prepare()31 def _is_system_stable(self):32 return not self._params.is_interrupted33 def _stopping(self, measure):34 result = self._check_button_hold()35 if result is not None:36 self._params.is_interrupted = result37 return True...

Full Screen

Full Screen

interrupt_util.py

Source:interrupt_util.py Github

copy

Full Screen

...10 interrupted = False11 def set_interrupted(signum, frame):12 nonlocal interrupted13 interrupted = True14 def is_interrupted():15 nonlocal interrupted16 return interrupted17 # Replace the current interrupt handler18 original_sigint_handler = signal.getsignal(signal.SIGINT)19 signal.signal(signal.SIGINT, set_interrupted)20 try:21 yield is_interrupted22 finally:23 # Restore original interrupt handler24 signal.signal(signal.SIGINT, original_sigint_handler)25if __name__ == "__main__":26 import time27 for i in range(5):28 print(i)29 time.sleep(1)30 print("-- Interrupts will now simply halt the loop --")31 with interrupt_catcher() as is_interrupted:32 for i in range(5):33 if is_interrupted():34 break35 print(i)36 time.sleep(1)37 print("-- Interrupts are back to normal --")38 for i in range(5):39 print(i)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Slash automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful