Best Python code snippet using slash
utils.py
Source:utils.py  
1import logging2import os3import random4import re5import socket6import sys7import time8import unicodedata9from io import BufferedReader10from pathlib import Path11from shutil import which12from subprocess import PIPE, Popen, check_call13from typing import IO, Any, Dict, List, Tuple, Union14from urllib.parse import urlparse15import click16import verboselogs17# from libnmap.objects.report import NmapReport18# from libnmap.parser import NmapParser19# from libnmap.process import NmapProcess20from progress.spinner import PixelSpinner21from progressbar import ETA, Bar, Counter, ProgressBar, Timer22from stringcolor import bold23from .config import BASE_PATH, ENV_MODE, LOGGING_LEVEL, PROJECT_NAME24from .defaultLogBanner import log_runBanner25from .locater import Locator26# ------------------------------------------------------------------------------27#28#29#30# ------------------------------------------------------------------------------31class Context:32    progress: Dict[int, ProgressBar] = {}33    def __init__(self):34        logging.log(logging.DEBUG, 'init context...')35        self.service: Any = None36        self.project: str = PROJECT_NAME37        self.base_path: str = BASE_PATH38        self.home_path: Path = Path.home()39        self.use_sudo: List[str] = []40        self.utils: Union[Utils, None] = None41        self.disable_split_project: Union[bool, None] = None42        self.disable_split_host: Union[bool, None] = None43        self.print_only_mode: Union[bool, None] = None44        self.terminal_read_mode: bool = False45        self.logging_verbose: Union[int, None] = None46        self.logging_level: Union[str, None] = None47pass_context = click.make_pass_decorator(Context, ensure=True)48# ------------------------------------------------------------------------------49#50#51#52# ------------------------------------------------------------------------------53class Utils:54    runner_init_count = 155    runner_time_check_running = 156    runner_text_it_is_running = [57        "...yep, still running",58        "...no stress, process still running",59        "...process is aaalive ;)",60        "...we current still processing, please wait ... loooong time :P",61        "...still running bro"62    ]63    # --------------------------------------------------------------------------64    #65    #66    #67    # --------------------------------------------------------------------------68    def __init__(self, ctx: Context):69        self.update(ctx, is_init=True)70    def update(self, ctx: Context, is_init: bold = False):71        self.ctx = ctx72        if not is_init and LOGGING_LEVEL == logging.getLevelName(logging.DEBUG):73            print()74            print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')75            logging.log(logging.DEBUG, f'LOGGING-LEVEL          : {bold(LOGGING_LEVEL)}')76            logging.log(logging.DEBUG, f'DISABLED SPLIT PROJECT : {bold(self.ctx.disable_split_project)}')77            logging.log(logging.DEBUG, f'DISABLED SPLIT HOST    : {bold(self.ctx.disable_split_host)}')78            logging.log(logging.DEBUG, f'PRINT ONLY MODE        : {bold(self.ctx.print_only_mode)}')79            logging.log(logging.DEBUG, f'PROJECT-PATH           : {bold(self.create_service_path(None))}{bold("/")}')80            logging.log(logging.DEBUG, f'ENV-MODE               : {bold(ENV_MODE)}')81            print('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~')82            print()83    # --------------------------------------------------------------------------84    #85    # path | folder | file - helper86    #87    # --------------------------------------------------------------------------88    def create_folder(self, path: str) -> None:89        Path(path).mkdir(parents=True, exist_ok=True, mode=0o700)90    def get_user_path(self) -> str:91        return str(Path.home())92    def create_service_folder(self, name: str, host: Union[str, None] = None) -> str:93        path = f'{self.create_service_path(host)}/{name}'94        self.create_folder(path)95        logging.log(logging.DEBUG, f'new folder created:: {path}')96        return path97    def create_service_path(self, host: Union[str, None] = None):98        if not self.ctx.disable_split_host and host is not None:99            host = self.slugify(host)100            host = '' if host is None else f'/{host}'101        else:102            host = ''103        if not self.ctx.disable_split_project:104            project = '' if self.ctx.project is None else f'/{self.ctx.project}'105        else:106            project = ''107        if self.ctx.base_path[-1] == '/':108            self.ctx.base_path = self.ctx.base_path[:-1]109        return f'{self.ctx.base_path}{project}{host}'110    # --------------------------------------------------------------------------111    #112    # command exec - helper113    #114    # --------------------------------------------------------------------------115    def run_command_endless(self, command_list=[]) -> None:116        sub_p: Union[Popen[bytes], None] = None117        is_running = True118        try:119            index_to_check = 0120            index_to_check = 1 if command_list[index_to_check] == 'sudo' else index_to_check121            # if sudo is in command, first check into root122            if index_to_check == 1:123                if self.prompt_sudo() != 0:124                    sys.exit(4)125            logging.log(verboselogs.NOTICE, ' '.join(command_list))126            if self.is_tool(command_list[index_to_check]):127                with Popen(command_list) as sub_p:128                    while is_running:129                        time.sleep(600)130            else:131                logging.log(logging.ERROR, f'the command "{command_list[index_to_check]}", did not exist')132        except (SystemExit, KeyboardInterrupt) as k:133            logging.log(logging.WARNING, f'process interupted! ({k})')134        except Exception as e:135            logging.log(logging.CRITICAL, e, exc_info=True)136        is_running = False137        try:138            if sub_p is not None:139                sub_p.terminate()140        except Exception:141            pass142        try:143            if sub_p is not None:144                while sub_p.poll() is None:145                    time.sleep(1)146        except Exception:147            pass148    def run_command(self, command_list: List[str] = [], input_value: Union[str, None] = None149                    ) -> Tuple[Union[str, None], Union[str, None], bool]:150        sub_std_res: Union[str, None] = None151        sub_err_res: Union[str, None] = None152        is_interrupted: bool = False153        if not self.ctx.print_only_mode:154            try:155                sub_std: Union[bytes, str, None] = None156                sub_err: Union[bytes, str, None] = None157                index_to_check = 0158                index_to_check = 1 if command_list[index_to_check] == 'sudo' else index_to_check159                # if sudo is in command, first check into root160                if index_to_check == 1:161                    if self.prompt_sudo() != 0:162                        sys.exit(4)163                if self.is_tool(command_list[index_to_check]):164                    if input_value is None:165                        # , start_new_session=True166                        with Popen(command_list, stdout=PIPE, stderr=PIPE) as sub_p:167                            sub_std, sub_err, is_interrupted = self.subprocess_handler(168                                sub_p=sub_p, input_value=input_value, command=command_list[index_to_check])169                    else:170                        with Popen(command_list, stdout=PIPE, stderr=PIPE, stdin=PIPE) as sub_p:171                            sub_std, sub_err, is_interrupted = self.subprocess_handler(172                                sub_p=sub_p, input_value=input_value, command=command_list[index_to_check])173                else:174                    logging.log(logging.ERROR, f'the command "{command_list[index_to_check]}", did not exist')175                    sub_err = b"MISSING_COMMAND"176                if sub_std is not None and isinstance(sub_std, bytes):177                    sub_std_res = sub_std.decode()178                if sub_err is not None and isinstance(sub_err, bytes) and len(sub_err) > 0:179                    sub_err_res = sub_err.decode()180                    logging.log(logging.ERROR, sub_err.split(b'\n'))181            except KeyboardInterrupt as k:182                logging.log(logging.WARNING, f'process interupted! ({k})')183                is_interrupted = True184            except Exception as e:185                logging.log(logging.CRITICAL, e, exc_info=True)186        return (sub_std_res, sub_err_res, is_interrupted)187    def subprocess_handler(self, sub_p: Popen[Any], input_value: Union[str, None] = None,188                           command: Union[str, None] = None189                           ) -> Tuple[Union[bytes, None], Union[bytes, None], bool]:190        sub_std: Union[bytes, None] = None191        sub_err: Union[bytes, None] = None192        sub_p_std: Union[IO[bytes], bytes, None] = None193        sub_p_err: Union[IO[bytes], None] = None194        is_interrupted: bool = False195        try:196            if sub_p.stdin is not None and input_value is not None:197                sub_p.stdin.write(input_value.encode())198                sub_p.stdin.close()199            if sub_p.poll() is None:200                if not self.ctx.terminal_read_mode or (command is not None and command == 'tee'):201                    time.sleep(self.runner_time_check_running)202                    with PixelSpinner('Processing... ') as spinner:203                        while sub_p.poll() is None:204                            if self.runner_init_count % 6 == 0:205                                spinner.message = f'{random.choice(self.runner_text_it_is_running)} '206                                self.runner_init_count = 1207                            spinner.next()208                            self.runner_init_count += 1209                            time.sleep(self.runner_time_check_running)210                    if sub_p.stdout is not None:211                        sub_p_std = sub_p.stdout212                else:213                    logging.log(214                        logging.INFO, 'you run in terminal read mode, some function can maybe not print anything and you will see longer no response, please wait ...')215                    for stdout_line in sub_p.stdout:216                        if stdout_line is not None and len(stdout_line) > 0:217                            if sub_p_std is None:218                                sub_p_std = stdout_line219                            else:220                                sub_p_std += stdout_line221                            logging.log(logging.INFO, stdout_line.decode().replace('\n', ''))222            if sub_p.stderr is not None:223                sub_p_err = sub_p.stderr224        except (SystemExit, KeyboardInterrupt):225            is_interrupted = True226            if not self.ctx.terminal_read_mode:227                if sub_p.stdout is not None:228                    sub_p_std = sub_p.stdout229            if sub_p.stderr is not None:230                sub_p_err = sub_p.stderr231            try:232                sub_p.kill()233            except Exception:234                pass235        if isinstance(sub_p_std, bytes):236            sub_std = sub_p_std237        if isinstance(sub_p_std, BufferedReader):238            sub_std = sub_p_std.read()239        if isinstance(sub_p_err, BufferedReader):240            sub_err = sub_p_err.read()241        return (sub_std, sub_err, is_interrupted)242    def is_tool(self, name: str) -> bool:243        '''244            Check whether `name` is on PATH and marked as executable.245        '''246        return which(name) is not None247    def run_command_output_loop(self, msg: str, cmds: List[List[str]] = [], output: bool = True) -> Union[str, None]:248        '''249            run command from list in a loop, and also optional pipe them into each other250            default exec function is "run_command" with different251        '''252        cmd_result: Union[str, None] = None253        is_interrupted: bool = False254        try:255            log_runBanner(msg)256            if len(cmds) <= 1:257                output = False258            for cmd in cmds:259                if not is_interrupted or cmd[0] == 'tee':260                    logging.log(verboselogs.NOTICE, ' '.join(cmd))261                    if output:262                        cmd_result, std_err, is_interrupted = self.run_command(263                            command_list=cmd, input_value=cmd_result)264                    else:265                        cmd_result, std_err, is_interrupted = self.run_command(command_list=cmd)266                    if std_err is not None and std_err == "MISSING_COMMAND":267                        cmd_result = None268                        break269                    if cmd_result is not None:270                        if len(cmd_result) > 0:271                            logging.log(verboselogs.SPAM, f'output is:\n{cmd_result}')272                        else:273                            cmd_result = None274                            if output:275                                logging.log(logging.WARNING, 'no result available to pipe')276                                break277                    elif output:278                        logging.log(logging.WARNING, 'no result available to pipe')279                        break280        except KeyboardInterrupt as k:281            logging.log(logging.WARNING, f'process interupted! ({k})')282            raise KeyboardInterrupt283        except Exception as e:284            logging.log(logging.CRITICAL, e, exc_info=True)285        if is_interrupted and cmd_result is None:286            raise KeyboardInterrupt287        return cmd_result288    # --------------------------------------------------------------------------289    #290    #291    #292    # --------------------------------------------------------------------------293    def group(self, flat: List[Any], size: int) -> List[Any]:294        '''295            group list a flat list into a matrix of "size"296        '''297        return [flat[i:i+size] for i in range(0, len(flat), size)]298    def normalize_caseless(self, text: str) -> str:299        '''300            lowercase a string, for any unicode301        '''302        return unicodedata.normalize('NFKD', text.casefold())303    def slugify(self, value: Union[str, None], allow_unicode: bool = False) -> Union[str, None]:304        '''305            https://github.com/django/django/blob/main/django/utils/text.py306        '''307        value = str(value)308        if allow_unicode:309            value = unicodedata.normalize('NFKC', value)310        else:311            value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore').decode('ascii')312        value = re.sub(r'[^\w\s-]', '', value.lower())313        return re.sub(r'[-\s]+', '-', value).strip('-_')314    # --------------------------------------------------------------------------315    #316    #317    #318    # --------------------------------------------------------------------------319    def in_sudo_mode(self) -> None:320        '''321            If the user doesn't run the program with super user privileges, don't allow them to continue.322        '''323        if 'SUDO_UID' not in os.environ.keys():324            logging.log(logging.ERROR, 'Try running this program with sudo.')325            sys.exit(1)326    def prompt_sudo(self) -> int:327        try:328            if os.geteuid() != 0:329                msg = "hay [sudo] password for %u: "330                return check_call(f"sudo -v -p '{msg}'", shell=True)331        except Exception:332            pass333        return -1334    def get_ip_address(self) -> Union[str, None]:335        IP = None336        st = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)337        try:338            st.connect(('10.255.255.255', 1))339            IP = st.getsockname()[0]340        except Exception:341            IP = '127.0.0.1'342        finally:343            st.close()344        return IP345    def uri_validator(self, url: str) -> Union[str, None]:346        try:347            if url.endswith('/'):348                url = url[:-1]349            result = urlparse(url)350            if all([result.scheme, result.netloc]):351                return url352        except Exception:353            pass354        return None355    def geo(self) -> Union[str, None]:356        '''357            This is a geo test example358        '''359        try:360            return Locator(ctx=self.ctx).check_database()361        except Exception as e:362            logging.log(logging.CRITICAL, e, exc_info=True)363        return None364    # --------------------------------------------------------------------------365    #366    #367    #368    # --------------------------------------------------------------------------369    def progress(self, id: int, value: int, description: str = "Processing", maxval: int = 100) -> None:370        try:371            # if self.ctx.progress.get(id) is None:372            #     self.ctx.progress[id] = tqdm(total=maxval, desc=description, colour="#000", leave=False)373            # if self.ctx.progress.get(id) is not None:374            #     bar = self.ctx.progress.get(id)375            #     bar.update(value)376            if self.ctx.progress.get(id) is None:377                self.ctx.progress[id] = ProgressBar(378                    widgets=[description, ' [', Timer(), '] ', Bar(marker='O'), ' [', Counter(379                        format='%(value)02d/%(max_value)d'), ']', ' (', ETA(), ') '],380                    maxval=maxval).start()381            bar_p: ProgressBar = self.ctx.progress.get(id)382            bar_p.update(value=value)383            if value >= maxval:384                print()385        except Exception as e:386            logging.log(logging.CRITICAL, e, exc_info=True)387    # def nmap_process(self, msg: str, host: str, options: List[str], safe_mode: bool = True) -> NmapReport:388    #     try:389    #         log_runBanner(msg)390    #         logging.log(verboselogs.NOTICE, f'nmap {" ".join(host)} {" ".join(options)}')391    #         if not self.ctx.print_only_mode:392    #             nmap_proc: NmapProcess = NmapProcess(targets=host, options=' '.join(options), safe_mode=safe_mode)393    #             nmap_proc.run_background()394    #             while nmap_proc.is_running():395    #                 self.progress(100, float(nmap_proc.progress))396    #                 time.sleep(0.01)397    #             self.progress(100, 100)398    #             if nmap_proc.stderr is not None:399    #                 if "QUITTING" in nmap_proc.stderr:400    #                     logging.log(logging.CRITICAL, nmap_proc.stderr)401    #                     return None402    #                 logging.log(logging.WARNING, nmap_proc.stderr)403    #             return NmapParser.parse(nmap_proc.stdout)404    #     except Exception as e:405    #         logging.log(logging.CRITICAL, e, exc_info=True)406    # --------------------------------------------------------------------------407    #408    #409    #410    # --------------------------------------------------------------------------411    def define_option_list(self, options: str, default_options: List[Any] = [],412                           options_append: bool = False, default_split_by: str = ',') -> List[Any]:413        '''414            defines a list of option to use in a callable service415            to define how to create this list416            by:417                - create it from a default only418                - create it from params only419                - create it by combine default and params420        '''421        try:422            result: List[Any] = []423            # add options from params424            if options is not None and not options_append:425                result = [options]  # .split(default_split_by)426            # add options from params to existing options427            elif options is not None and options_append:428                result = default_options + [options]  # .split(default_split_by)429            # use existing options430            else:431                result = default_options432            return result433        except Exception as e:434            logging.log(logging.CRITICAL, e, exc_info=True)...trolley_tuner_base.py
Source:trolley_tuner_base.py  
1import logging2from abc import ABC3from eva.lib.parameters import Parameters4from eva.robots.trolley_base import TrolleyBase5from eva.tuner.tuner_base import TunerBase6logger = logging.getLogger()7class TrolleyTunerBase(TrolleyBase, TunerBase, ABC):8    MAX_STEPS = 79    def __init__(self):10        super(TrolleyTunerBase, self).__init__()11        self._params = Parameters({'is_interrupted': False})12    def _maximize_params(self, get_params, start_value=0.0, end_value=1.0, step=0):13        current_value = (start_value + end_value) * 0.514        if step >= self.MAX_STEPS:15            return start_value16        self._restore_and_update_params(get_params(current_value))17        self._run()18        if self._is_system_stable():19            return self._maximize_params(get_params, current_value, end_value, step + 1)20        else:21            return self._maximize_params(get_params, start_value, current_value, step + 1)22    def _process(self):23        self._params.backup()24    def _restore_and_update_params(self, params):25        self._params.restore()26        self._params.update(params)27    def _prepare(self):28        # Set manually to the initial position29        self._wait_button_press()30        super(TrolleyTunerBase, self)._prepare()31    def _is_system_stable(self):32        return not self._params.is_interrupted33    def _stopping(self, measure):34        result = self._check_button_hold()35        if result is not None:36            self._params.is_interrupted = result37            return True...interrupt_util.py
Source:interrupt_util.py  
...10    interrupted = False11    def set_interrupted(signum, frame):12        nonlocal interrupted13        interrupted = True14    def is_interrupted():15        nonlocal interrupted16        return interrupted17    # Replace the current interrupt handler18    original_sigint_handler = signal.getsignal(signal.SIGINT)19    signal.signal(signal.SIGINT, set_interrupted)20    try:21        yield is_interrupted22    finally:23        # Restore original interrupt handler24        signal.signal(signal.SIGINT, original_sigint_handler)25if __name__ == "__main__":26    import time27    for i in range(5):28        print(i)29        time.sleep(1)30    print("-- Interrupts will now simply halt the loop --")31    with interrupt_catcher() as is_interrupted:32        for i in range(5):33            if is_interrupted():34                break35            print(i)36            time.sleep(1)37    print("-- Interrupts are back to normal --")38    for i in range(5):39        print(i)...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
