Best Python code snippet using avocado_python
profiler_handler.py
Source:profiler_handler.py  
1from nodewatts.config import NWConfig2from nodewatts.subprocess_manager import NWSubprocessError, NWSubprocessTimeout, SubprocessManager3from nodewatts.error import NodewattsError4import os5import shutil6import json7import logging8from datetime import datetime9from typing import Tuple10import time11import subprocess12import pwd13logger = logging.getLogger("Main")14# Note:15# Must force user to ensure correct version of node is the default,16# If the the project requires an older version, supply and nvm exec command17# and ensure bash recognizes it.18class ProfilerException(NodewattsError):19    def __init__(self, msg: str, *args, **kwargs):20        super().__init__(msg, *args, **kwargs)21class ProfilerInitError(NodewattsError):22    def __init__(self, msg: str, *args, **kwargs):23        super().__init__(msg, *args, **kwargs)24class ProfilerHandler():25    def __init__(self, conf: NWConfig, manager: SubprocessManager):26        self.root = conf.root_path27        self.entry_full_path = os.path.join(conf.root_path, conf.entry_file)28        self.entry_basepath, self.entry_filename = self._parse_entry_filepath(29                                        self.entry_full_path30                                    )31        self.commands = conf.commands32        self.profile_title = datetime.now().isoformat()33        self.tmp_path = None34        self.socket_port = conf.profiler_port35        self.proc_manager = manager36        self.profiler_env_vars = {}37        self.es6 = conf.es638        self.test_runs = conf.test_runs39        self.server_process = None40        self.test_runner_timeout = conf.test_runner_timeout41        self.deps_installed = False42        self.code_injected = False43        self.fail_code = None44        self.user = conf.user45        self.aliased_npm_requirements = [46            "nw-zeromq@npm:zeromq@6.0.0-beta.6", "nw-prof@npm:v8-profiler-next"]47        self._db_service_root = os.path.join(48            NWConfig.package_root, "resources/javascript/nodewatts_cpu_profile_db")49        self._profiler_scripts_root = os.path.join(50            NWConfig.package_root, "resources/javascript/nodewatts_profiler_agent")51        self.server_wait = conf.server_startup_wait52        self.aliased_npm_requirements = [53            "nw-zeromq@npm:zeromq@6.0.0-beta.6", "nw-prof@npm:v8-profiler-next"]54        self.profiler_env_vars["PATH_TO_DB_SERVICE"] = None55        self.profiler_env_vars["PROFILE_TITLE"] = self.profile_title56        self.profiler_env_vars["TEST_SOCKET_PORT"] = str(self.socket_port)57        self.profiler_env_vars["TESTCMD"] = self.commands["runTests"]58        self.profiler_env_vars["ZMQ_INSTALLED_PATH"] = os.path.join(59            self.root, "node_modules/nw-zeromq")60        if conf.engine_conf_args["internal_db_uri"][-1] == "/":61            self.profiler_env_vars["NODEWATTS_DB_URI"] = conf.engine_conf_args["internal_db_uri"] + "nodewatts"62        else:63            self.profiler_env_vars["NODEWATTS_DB_URI"] = conf.engine_conf_args["internal_db_uri"] + "/nodewatts"64        65        self.profiler_env_vars["NODEWATTS_TMP_PATH"] = None66        if conf.use_nvm:67            if not conf.override_nvm_path:68                try:69                    self.nvm_path = self._resolve_nvm_path(conf.user, conf.node_version)70                except NodewattsError as e :71                    logger.error(str(e))72                    raise ProfilerInitError(None) from None73            else:74                self.nvm_path = conf.override_nvm_path75        else: 76            self.nvm_path = None77    def setup_env(self):78        # Replicate what appdir does to get user writable data file location79        pw_record = pwd.getpwnam(self.user)80        homedir = pw_record.pw_dir81        self.tmp_path = os.path.join(homedir,'.local','share','nodewatts')82        if not os.path.exists(self.tmp_path):83            try:84                os.mkdir(self.tmp_path)85                os.chmod(self.tmp_path, 0o777)86            except OSError as e:87                logger.error("Failed to create temporary data directory in user space. Message: " +str(e))88                raise ProfilerInitError(None)89        else:90            try:91                shutil.rmtree(self.tmp_path)92                os.mkdir(self.tmp_path)93                os.chmod(self.tmp_path, 0o777)94            except OSError as e:95                logger.error("Failed to create temporary data directory in user space. Message: " +str(e))96                raise ProfilerInitError(None)97        self.profiler_env_vars["NODEWATTS_TMP_PATH"] = self.tmp_path98        self._setup_db_service()99        self._save_copy_of_entry_file()100        self._inject_profiler_script()101        self._install_npm_dependencies()102    # Starts the web server process. Performs necessary cleanup and exits pacakge in case of 103    # failure. Safe to call directly. Return the PID of the server if successful104    def start_server(self) -> int:105        logger.debug("Starting cpu profiler.")106        self.server_process = self.proc_manager.project_process_async(107                "echo \"Running server with node version: $(which node)\" && "108                + self.commands["serverStart"], custom_env=self.profiler_env_vars, 109                inject_to_path=self.nvm_path)110        # Poll the server process in 1 second increments for the specified wait duration.111        # If the PID file isn't there after the wait time, we assume it hasn't started correctly.112        # However, cases remain where there server takes a long time to start up113        # and may report the PID before a fatal crash.114        # To handle these cases, the server process will be repeatedly polled115        # throughout the operations that follow its startup in order to catch these116        # cases and report crash info to the user.117        # Note that PID file is used since node is run as a child of shell the shell process118        # so we do not want to include the shell in the monitoring119        attempts = 0120        while True:121            retcode = self.server_process.poll()122            if retcode is None:123                if os.path.exists(os.path.join(self.tmp_path, "PID.txt")):124                    logger.debug(125                        "PID file located on attempt " + str(attempts+1)+". Proceeding.")126                    break127            else:128                logger.error("Web server did not start successfully. Exited with return code: " +129                                str(self.server_process.returncode))130                raise ProfilerException(None)131            attempts += 1132            if attempts == self.server_wait:133                logger.error("Failed to locate server PID in " + str(attempts) + " attempts. This could be an indication " +134                             "that the given web server took longer than "+str(self.server_wait)+" seconds to start, " +135                             "or that it prematurely exited with a return code of 0.")136                logger.info("To allow the server greater time to initilize, set \"dev-serverWait\" config " +137                            "option in the config file to the desired wait time in seconds.")138                raise ProfilerException(None)139            time.sleep(1.0)140        logger.debug("Server started successfully")141        with open(os.path.join(self.tmp_path, "PID.txt")) as f:142            pid = f.read()143        return pid144    # Runs provided test suite three times or cleans up and exits in case of failure145    def run_test_suite(self) -> None:146        logger.info("Running tests. This may take a moment.")147        for i in range(0,self.test_runs):148            logger.debug("Test Run " + str(i+1))149            cmd = "node " + \150                os.path.join(self._profiler_scripts_root, "test-runner.js")151            if self.server_process.poll() is None:152                if i == self.test_runs - 1:153                    self.profiler_env_vars["FINAL_RUN"] = "true"154                try:155                    stdout, stderr = self.proc_manager.project_process_blocking(156                        cmd, custom_env=self.profiler_env_vars, timeout=self.test_runner_timeout, inject_to_path=self.nvm_path)157                    logger.debug("Test Suite run successfully: \n" +158                                "stdout: \n" + stdout + "stderr: \n" + stderr)159                except NWSubprocessError as e:160                    logger.error("Failed to run test suite. Error: \n" + str(e))161                    raise ProfilerException(None)162                except NWSubprocessTimeout as e:163                    logger.error("Test suite process timeout out in " + str(self.test_runner_timeout) +164                                " seconds." + "If you believe the provided test suite requires longer than this" +165                                " to sucessfully complete, please configure the \"dev-testRunnerTimeout\" " +166                                "setting in the config file as necessary. \n" + "Test runner output before timeout: \n" + str(e))167                    raise ProfilerException(None)168            else:169                logger.error("Web server encounted an error. Exited with return code: " +170                                str(self.server_process.returncode))171                raise ProfilerException(None)172    def _inject_profiler_script(self, ES6=False) -> None:173        logger.debug("Injecting profiler code to entry file.")174        if self.es6 or self._is_es6():175            with open(os.path.join(self._profiler_scripts_root, "es6-imports.js")) as f:176                imports = f.read()177        else:178            with open(os.path.join(self._profiler_scripts_root, "imports.js")) as f:179                imports = f.read()180        with open(self.entry_full_path, "r+") as f:181            content = f.read()182            f.seek(0, 0)183            f.write(imports.rstrip('\r\n') + '\n' + content)184        with open(os.path.join(self._profiler_scripts_root, "profiler-socket.js")) as f:185            script = f.read()186        with open(self.entry_full_path, "a+") as f:187            f.write(script)188        self.code_injected = True189    def _setup_db_service(self):190        logger.info("Setting up NodeWatts Database Service...")191        dest_path = os.path.join(self.tmp_path, 'nodewatts_cpu_profile_db')192        if not os.path.exists(dest_path):193            try:194                shutil.copytree(self._db_service_root, dest_path)195            except Exception as e:196                logger.error("Failed to copy nodewatts service database to data directory. Error:" + str(e))197                raise ProfilerException(None)198        os.chmod(dest_path, 0o777)199        try:200            stdout, stderr = self.proc_manager.generic_user_process_blocking("npm install", cwd=dest_path, inject_to_path=self.nvm_path)201        except NWSubprocessError as e:202                    logger.error("Failed to install database service dependencies. Error: \n" + str(e))203                    raise ProfilerException(None)204        logger.debug("Successfull setup database service. stdout: \n" + stdout)205        self.profiler_env_vars["PATH_TO_DB_SERVICE"] = os.path.join(dest_path,'src/main/index.js')206    @staticmethod207    def _resolve_nvm_path(username:str, version:str) -> str:208        pw_record = pwd.getpwnam(username)209        homedir = pw_record.pw_dir210        nvm_path = os.path.join(homedir,".nvm" ,"versions","node","v"+version , "bin")211        if not os.path.exists(nvm_path):212            logger.error("Could not locate nvm path for specified version. Tried: " + nvm_path)213            raise ProfilerException(None)214        return nvm_path215    def _parse_entry_filepath(self, path: str) -> Tuple[str, str]:216        return os.path.split(path)217    def _save_copy_of_entry_file(self) -> None:218        shutil.copy2(self.entry_full_path, self.tmp_path)219    def _restore_entry_file(self) -> None:220        os.remove(self.entry_full_path)221        shutil.move(os.path.join(222            self.tmp_path, self.entry_filename), self.entry_basepath)223        shutil.chown(self.entry_full_path, user=self.user)224    # Installs required package versions that are aliased to avoid collisions if225    # user is already making use of the packages in the project226    def _install_npm_dependencies(self) -> None:227        logger.info("Installing npm dependencies. This may take a moment.")228        cmd = "npm i -D " + \229            " ".join(self.aliased_npm_requirements)230        try:231            stdout, stderr = self.proc_manager.project_process_blocking(cmd, inject_to_path=self.nvm_path)232        except NWSubprocessError as e:233            logger.error("Failed to install npm dependencies. Error:" + str(e))234            raise ProfilerInitError(None)235        logger.debug("Dependencies installed successfully. stdout: \n" + stdout + "\n"236                     + "stderr: \n" + stderr)237        self.deps_installed = True238    def _uninstall_npm_dependencies(self) -> None:239        logger.info("Uninstalling npm dependencies. This may take a moment.")240        aliases = ["nw-zeromq", "nw-prof"]241        uninstall = "npm uninstall " + " ".join(aliases)242        try:243            stdout, stderr = self.proc_manager.project_process_blocking(244                    uninstall, inject_to_path=self.nvm_path)245        except NWSubprocessError as e:246            logger.warning("Failed to uninstall the following temporary aliased npm packages: "247                            + " ".join(self.aliased_npm_requirements)248                            + ". These package will need to removed manually. NPM error message: \n"249                            + str(e))250        else: 251            logger.debug("Dependencies uninstalled successfully. stdout: \n" + stdout + "\n"252                        + "stderr: \n" + stderr)253            self.deps_installed = False254    def _is_es6(self) -> bool:255        package = self._load_package_file()256        if "type" in package and package["type"] == "module":257            self.es6 = True258        return self.es6259    def _load_package_file(self) -> dict:260        pkg_path = os.path.join(self.root, "package.json")261        if not os.path.exists(pkg_path):262            logger.error("Project root directory must include a package.json file.")263            raise ProfilerException(None)264        with open(pkg_path, "r") as f:265            package = json.load(f)266        return package267    def _log_server_output(self) -> None:268        try:269            output, _ = self.server_process.communicate(timeout=5)270            logger.debug("Server output: \n" + output)271        except subprocess.TimeoutExpired:272            logger.debug("Unable to terminate server process and retrieve output. " + 273            "Please ensure server exited successfully before runnging nodewatts again")274    def _shutdown_server(self) -> None:275        logger.debug("Shutting down server.")276        self.proc_manager.terminate_process_tree(self.server_process.pid)277        self._log_server_output()278    def cleanup(self) -> None:279        logger.debug("Cleaning up project directory.")280        if self.code_injected:281            self._restore_entry_file()282        if self.deps_installed:283            self._uninstall_npm_dependencies()284        if self.server_process is not None:285            if self.server_process.poll() is None:286                self._shutdown_server()287            else:288                logger.debug("Unexpected server exit with return code: " + str(self.server_process.poll()))289                self._log_server_output()...test_timeout.py
Source:test_timeout.py  
...12    reasonable length of time.13    """14    proc.kill()15    raise RuntimeError("Timeout popped.")16def test_runner_timeout():17    """Test the globsl Testplan timeout feature."""18    testplan_script = os.path.join(19        os.path.dirname(__file__), "timeout_test_plan.py"20    )21    assert os.path.isfile(testplan_script)22    current_proc = psutil.Process()23    start_procs = current_proc.children()24    output_json = tempfile.NamedTemporaryFile(suffix=".json").name25    try:26        proc = subprocess.Popen(27            [sys.executable, testplan_script, "--json", output_json],28            stdout=subprocess.PIPE,29            universal_newlines=True,30        )...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
