Best Python code snippet using autotest_python
test_command_factory.py
Source:test_command_factory.py  
1import os2import shutil3from os import getcwd4from tempfile import mkdtemp5from typing import (6    List,7    Tuple,8)9from unittest import TestCase10from galaxy.jobs.command_factory import (11    build_command,12    PREPARE_DIRS,13    SETUP_GALAXY_FOR_METADATA,14)15from galaxy.tool_util.deps.container_classes import TRAP_KILL_CONTAINER16from galaxy.util.bunch import Bunch17MOCK_COMMAND_LINE = "/opt/galaxy/tools/bowtie /mnt/galaxyData/files/000/input000.dat"18TEST_METADATA_LINE = "set_metadata_and_stuff.sh"19TEST_FILES_PATH = "file_path"20TEE_REDIRECT = '> "$__out" 2> "$__err"'21RETURN_CODE_CAPTURE = "; return_code=$?; echo $return_code > galaxy_1.ec"22CP_WORK_DIR_OUTPUTS = '; \nif [ -f "foo" ] ; then cp "foo" "bar" ; fi'23class TestCommandFactory(TestCase):24    stream_stdout_stderr = False25    TEE_LOG = " "26    CAPTURE_AND_REDIRECT = f"> '../outputs/tool_stdout' 2> '../outputs/tool_stderr'{RETURN_CODE_CAPTURE}"27    def setUp(self):28        self.job_dir = mkdtemp()29        self.job_wrapper = MockJobWrapper(self.job_dir)30        self.workdir_outputs: List[Tuple[str, str]] = []31        def workdir_outputs(job_wrapper, **kwds):32            assert job_wrapper == self.job_wrapper33            return self.workdir_outputs34        self.runner = Bunch(35            app=Bunch(model=Bunch(Dataset=Bunch(file_path=TEST_FILES_PATH))), get_work_dir_outputs=workdir_outputs36        )37        self.include_metadata = False38        self.include_work_dir_outputs = True39    def tearDown(self):40        shutil.rmtree(self.job_dir)41    def test_simplest_command(self):42        self.include_work_dir_outputs = False43        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE))44    def test_shell_commands(self):45        self.include_work_dir_outputs = False46        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]47        self.job_wrapper.dependency_shell_commands = dep_commands48        self._assert_command_is(self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"))49    def test_shell_commands_external(self):50        self.job_wrapper.commands_in_new_shell = True51        self.include_work_dir_outputs = False52        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]53        self.job_wrapper.dependency_shell_commands = dep_commands54        self._assert_command_is(55            self._surround_command(56                "{} {}/tool_script.sh".format(57                    self.job_wrapper.shell,58                    self.job_wrapper.working_directory,59                )60            )61        )62        self.__assert_tool_script_is(f"#!/bin/sh\n{dep_commands[0]}; {MOCK_COMMAND_LINE}")63    def test_remote_dependency_resolution(self):64        self.include_work_dir_outputs = False65        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]66        self.job_wrapper.dependency_shell_commands = dep_commands67        self._assert_command_is(68            self._surround_command(MOCK_COMMAND_LINE), remote_command_params=dict(dependency_resolution="remote")69        )70    def test_explicit_local_dependency_resolution(self):71        self.include_work_dir_outputs = False72        dep_commands = [". /opt/galaxy/tools/bowtie/default/env.sh"]73        self.job_wrapper.dependency_shell_commands = dep_commands74        self._assert_command_is(75            self._surround_command(f"{dep_commands[0]}; {MOCK_COMMAND_LINE}"),76            remote_command_params=dict(dependency_resolution="local"),77        )78    def test_task_prepare_inputs(self):79        self.include_work_dir_outputs = False80        self.job_wrapper.prepare_input_files_cmds = ["/opt/split1", "/opt/split2"]81        self._assert_command_is(self._surround_command(f"/opt/split1; /opt/split2; {MOCK_COMMAND_LINE}"))82    def test_workdir_outputs(self):83        self.include_work_dir_outputs = True84        self.workdir_outputs = [("foo", "bar")]85        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE, CP_WORK_DIR_OUTPUTS))86    def test_workdir_outputs_with_glob(self):87        self.include_work_dir_outputs = True88        self.workdir_outputs = [("foo*bar", "foo_x_bar")]89        self._assert_command_is(90            self._surround_command(91                MOCK_COMMAND_LINE, '; \nif [ -f "foo"*"bar" ] ; then cp "foo"*"bar" "foo_x_bar" ; fi'92            )93        )94    def test_set_metadata_skipped_if_unneeded(self):95        self.include_metadata = True96        self.include_work_dir_outputs = False97        self._assert_command_is(self._surround_command(MOCK_COMMAND_LINE))98    def test_set_metadata(self):99        self._test_set_metadata()100    def test_strips_trailing_semicolons(self):101        self.job_wrapper.command_line = "%s;" % MOCK_COMMAND_LINE102        self._test_set_metadata()103    def _test_set_metadata(self):104        self.include_metadata = True105        self.include_work_dir_outputs = False106        self.job_wrapper.metadata_line = TEST_METADATA_LINE107        expected_command = self._surround_command(108            MOCK_COMMAND_LINE, f"; cd '{self.job_dir}'; {SETUP_GALAXY_FOR_METADATA}{TEST_METADATA_LINE}"109        )110        self._assert_command_is(expected_command)111    def test_empty_metadata(self):112        """113        Test empty metadata as produced by TaskWrapper.114        """115        self.include_metadata = True116        self.include_work_dir_outputs = False117        self.job_wrapper.metadata_line = " "118        # Empty metadata command do not touch command line.119        expected_command = self._surround_command(MOCK_COMMAND_LINE, f"; cd '{self.job_dir}'")120        self._assert_command_is(expected_command)121    def test_metadata_kwd_defaults(self):122        configured_kwds = self.__set_metadata_with_kwds()123        assert configured_kwds["exec_dir"] == getcwd()124        assert configured_kwds["tmp_dir"] == self.job_wrapper.working_directory125        assert configured_kwds["dataset_files_path"] == TEST_FILES_PATH126        assert configured_kwds["output_fnames"] == ["output1"]127    def test_metadata_kwds_overrride(self):128        configured_kwds = self.__set_metadata_with_kwds(129            exec_dir="/path/to/remote/galaxy",130            tmp_dir="/path/to/remote/staging/directory/job1",131            dataset_files_path="/path/to/remote/datasets/",132            output_fnames=["/path/to/remote_output1"],133        )134        assert configured_kwds["exec_dir"] == "/path/to/remote/galaxy"135        assert configured_kwds["tmp_dir"] == "/path/to/remote/staging/directory/job1"136        assert configured_kwds["dataset_files_path"] == "/path/to/remote/datasets/"137        assert configured_kwds["output_fnames"] == ["/path/to/remote_output1"]138    def __set_metadata_with_kwds(self, **kwds):139        self.include_metadata = True140        self.include_work_dir_outputs = False141        self.job_wrapper.metadata_line = TEST_METADATA_LINE142        if kwds:143            self.__command(remote_command_params=dict(metadata_kwds=kwds))144        else:145            self.__command()146        return self.job_wrapper.configured_external_metadata_kwds147    def _assert_command_is(self, expected_command, **command_kwds):148        command = self.__command(**command_kwds)149        self.assertEqual(command, expected_command)150    def __assert_tool_script_is(self, expected_command):151        self.assertEqual(open(self.__tool_script).read(), expected_command)152    @property153    def __tool_script(self):154        return os.path.join(self.job_dir, "tool_script.sh")155    def __command(self, **extra_kwds):156        kwds = dict(157            runner=self.runner,158            job_wrapper=self.job_wrapper,159            include_metadata=self.include_metadata,160            include_work_dir_outputs=self.include_work_dir_outputs,161            stream_stdout_stderr=self.stream_stdout_stderr,162            **extra_kwds,163        )164        return build_command(**kwds)165    def _surround_command(self, command, post_command=""):166        command = f'''{PREPARE_DIRS};{self.TEE_LOG}{command} {self.CAPTURE_AND_REDIRECT}{post_command}; sh -c "exit $return_code"'''167        return command.replace("galaxy_1.ec", os.path.join(self.job_wrapper.working_directory, "galaxy_1.ec"), 1)168class TestCommandFactoryStreamStdoutStderr(TestCommandFactory):169    stream_stdout_stderr = True170    TEE_LOG = """ __out="${TMPDIR:-.}/out.$$" __err="${TMPDIR:-.}/err.$$"171mkfifo "$__out" "$__err"172trap 'rm -f "$__out" "$__err"' EXIT173tee -a '../outputs/tool_stdout' < "$__out" &174tee -a '../outputs/tool_stderr' < "$__err" >&2 & """175    CAPTURE_AND_REDIRECT = f"{TEE_REDIRECT}{RETURN_CODE_CAPTURE}"176    def test_kill_trap_replaced(self):177        self.include_work_dir_outputs = False178        self.job_wrapper.command_line = f"{TRAP_KILL_CONTAINER}{MOCK_COMMAND_LINE}"179        expected_command_line = self._surround_command(MOCK_COMMAND_LINE).replace(180            """trap 'rm -f "$__out" "$__err"' EXIT""", """trap 'rm -f "$__out" "$__err"; _on_exit' EXIT"""181        )182        self._assert_command_is(expected_command_line)183class MockJobWrapper:184    def __init__(self, job_dir):185        self.strict_shell = False186        self.command_line = MOCK_COMMAND_LINE187        self.dependency_shell_commands = []188        self.metadata_line = None189        self.configured_external_metadata_kwds = None190        self.working_directory = job_dir191        self.prepare_input_files_cmds = None192        self.commands_in_new_shell = False193        self.app = Bunch(194            config=Bunch(195                check_job_script_integrity=False,196            )197        )198        self.shell = "/bin/sh"199        self.use_metadata_binary = False200        self.job_id = 1201    def get_command_line(self):202        return self.command_line203    def container_monitor_command(self, *args, **kwds):204        return None205    @property206    def requires_setting_metadata(self):207        return self.metadata_line is not None208    def setup_external_metadata(self, *args, **kwds):209        self.configured_external_metadata_kwds = kwds210        return self.metadata_line211    @property212    def job_io(self):...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
