Best Python code snippet using autotest_python
test_commands.py
Source:test_commands.py  
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3"""Test scriptharness/commands/__init__.py4"""5from __future__ import absolute_import, division, print_function, \6                       unicode_literals7from contextlib import contextmanager8import logging9import mock10import os11import pprint12import scriptharness.commands as commands13from scriptharness.errorlists import ErrorList14from scriptharness.exceptions import ScriptHarnessError, \15    ScriptHarnessException, ScriptHarnessFatal, ScriptHarnessTimeout16import scriptharness.log as log17import scriptharness.status as status18from scriptharness.unicode import to_unicode19import shutil20import six21import subprocess22import sys23import time24import unittest25from . import LoggerReplacement26TEST_JSON = os.path.join(os.path.dirname(__file__), 'http', 'test_config.json')27TEST_DIR = "this_dir_should_not_exist"28TEST_COMMAND = [29    sys.executable, "-c",30    'from __future__ import print_function; print("hello");'31]32# Helper functions {{{133def cleanup():34    """Cleanliness"""35    if os.path.exists(TEST_DIR):36        shutil.rmtree(TEST_DIR)37def get_command(command=None, **kwargs):38    """Create a Command for testing39    """40    if command is None:41        command = TEST_COMMAND42    kwargs.setdefault('logger', LoggerReplacement())43    return commands.Command(command, **kwargs)44def get_parsed_command(command=None, **kwargs):45    """Create a ParsedCommand for testing46    """47    if command is None:48        command = TEST_COMMAND49    kwargs.setdefault('logger', LoggerReplacement())50    return commands.ParsedCommand(command, **kwargs)51@contextmanager52def get_output(command=None, **kwargs):53    """Create an Output for testing54    """55    if command is None:56        command = TEST_COMMAND57    kwargs.setdefault('logger', LoggerReplacement())58    cmd = commands.Output(command, **kwargs)59    try:60        yield cmd61    finally:62        cmd.cleanup()63def get_timeout_cmdlns():64    """Create a list of commandline commands to run to test timeouts.65    """66    cmdlns = [67        [sys.executable, "-c", "import time;time.sleep(300);"],68        [sys.executable, "-c",69         "from __future__ import print_function; import time;"70         "print('foo', end=' ');"71         "time.sleep(300);"],72    ]73    return cmdlns74# TestFunctions {{{175class TestFunctions(unittest.TestCase):76    """Test the command functions77    """78    def setUp(self):79        """Cleanliness"""80        assert self  # silence pylint81        cleanup()82    def tearDown(self):83        """Cleanliness"""84        assert self  # silence pylint85        cleanup()86    @mock.patch('scriptharness.commands.logging')87    def test_check_output(self, mock_logging):88        """test_commands | check_output()89        """90        logger = LoggerReplacement()91        mock_logging.getLogger.return_value = logger92        command = [sys.executable, "-mjson.tool", TEST_JSON]93        output = commands.check_output(command)94        self.assertEqual(95            logger.all_messages[0][1],96            commands.STRINGS["check_output"]["pre_msg"]97        )98        output2 = subprocess.check_output(command)99        self.assertEqual(output, output2)100    @mock.patch('scriptharness.commands.logging')101    def test_check_output_nolog(self, mock_logging):102        """test_commands | check_output() with no logging103        """104        logger = LoggerReplacement()105        mock_logging.getLogger.return_value = logger106        command = [sys.executable, "-mjson.tool", TEST_JSON]107        commands.check_output(command, log_output=False)108        self.assertEqual(109            logger.all_messages[0][1],110            commands.STRINGS["check_output"]["pre_msg"]111        )112        self.assertEqual(len(logger.all_messages), 1)113# TestDetectErrors {{{1114class TestDetectErrors(unittest.TestCase):115    """Test detect_errors()116    """117    def test_success(self):118        """test_commands | detect_errors() success119        """120        command = get_command()121        command.history['return_value'] = 0122        self.assertEqual(commands.detect_errors(command), status.SUCCESS)123    def test_failure(self):124        """test_commands | detect_errors() failure125        """126        command = get_command()127        for value in (1, None):128            command.history['return_value'] = value129            self.assertEqual(commands.detect_errors(command), status.ERROR)130# TestDetectParsedErrors {{{1131class TestDetectParsedErrors(unittest.TestCase):132    """Test detect_parsed_errors()133    """134    def test_success(self):135        """test_commands | detect_parsed_errors() success136        """137        error_list = ErrorList([])138        command = get_parsed_command(error_list=error_list)139        command.parser.history['num_errors'] = 0140        self.assertEqual(commands.detect_parsed_errors(command),141                         status.SUCCESS)142    def test_failure(self):143        """test_commands | detect_parsed_errors() failure144        """145        error_list = ErrorList([])146        command = get_parsed_command(error_list=error_list)147        for value in (1, 20):148            command.parser.history['num_errors'] = value149            self.assertEqual(commands.detect_parsed_errors(command),150                             status.ERROR)151# TestCommand {{{1152class TestCommand(unittest.TestCase):153    """Test Command()154    """155    def setUp(self):156        """Cleanliness"""157        assert self  # silence pylint158        cleanup()159    def tearDown(self):160        """Cleanliness"""161        assert self  # silence pylint162        cleanup()163    def test_simple_command(self):164        """test_commands | simple Command.run()165        """166        command = get_command()167        command.run()168        pprint.pprint(command.logger.all_messages)169        self.assertEqual(command.logger.all_messages[-1][2][0], "hello")170    def test_log_env(self):171        """test_commands | Command.log_env()172        """173        env = {"foo": "bar"}174        command = get_command(175            env=env,176        )177        command.log_env(env)178        command.run()179        env_line = "'foo': 'bar'"180        if os.name != 'nt' and six.PY2:181            env_line = "u'foo': u'bar'"182        count = 0183        for line in command.logger.all_messages:184            if line[1] == commands.STRINGS['command']['env']:185                print(line)186                self.assertTrue(env_line in line[2][0]["env"])187                count += 1188        self.assertEqual(count, 2)189    def test_bad_cwd(self):190        """test_commands | Command bad cwd191        """192        command = get_command(cwd=TEST_DIR)193        self.assertRaises(ScriptHarnessException, command.run)194    def test_good_cwd(self):195        """test_commands | Command good cwd196        """197        os.makedirs(TEST_DIR)198        command = get_command(cwd=TEST_DIR)199        command.log_start()200        self.assertEqual(201            command.logger.all_messages[0][1],202            command.strings["start_with_cwd"],203        )204    def test_output_timeout(self):205        """test_commands | Command output_timeout206        """207        for cmdln in get_timeout_cmdlns():208            now = time.time()209            command = get_command(command=cmdln, output_timeout=.5)210            print(cmdln)211            self.assertRaises(ScriptHarnessTimeout, command.run)212            self.assertTrue(now + 1 > time.time())213    def test_timeout(self):214        """test_commands | Command timeout215        """216        for cmdln in get_timeout_cmdlns():217            now = time.time()218            command = get_command(command=cmdln, timeout=.5)219            print(cmdln)220            self.assertRaises(ScriptHarnessTimeout, command.run)221            self.assertTrue(now + 1 > time.time())222    def test_command_error(self):223        """test_commands | Command.run() with error224        """225        command = get_command(226            command=[sys.executable, "-c", 'import sys; sys.exit(1)']227        )228        self.assertRaises(ScriptHarnessError, command.run)229    @mock.patch('scriptharness.commands.os')230    def test_fix_env(self, mock_os):231        """test_commands | Command.fix_env()232        """233        mock_os.name = 'nt'234        mock_os.environ = {u'SystemRoot': u'FakeSystemRoot'}235        command = get_command()236        env = {u'foo': u'bar', b'x': b'y'}237        if six.PY3:238            expected_env = {u'foo': u'bar', b'x': b'y',239                            u'SystemRoot': u'FakeSystemRoot'}240        else:241            expected_env = {b'foo': b'bar', b'x': b'y',242                            b'SystemRoot': b'FakeSystemRoot'}243        self.assertEqual(command.fix_env(env), expected_env)244# TestRun {{{1245class TestRun(unittest.TestCase):246    """test commands.run()247    """248    @mock.patch('scriptharness.commands.multiprocessing')249    def test_error(self, mock_multiprocessing):250        """test_commands | run() error251        """252        def raise_error(*args, **kwargs):253            """raise ScriptHarnessError"""254            if args or kwargs:  # silence pylint255                pass256            raise ScriptHarnessError("foo")257        mock_multiprocessing.Process = raise_error258        self.assertRaises(259            ScriptHarnessFatal, commands.run,260            "echo", halt_on_failure=True261        )262    @mock.patch('scriptharness.commands.multiprocessing')263    def test_timeout(self, mock_multiprocessing):264        """test_commands | run() timeout265        """266        def raise_error(*args, **kwargs):267            """raise ScriptHarnessTimeout"""268            if args or kwargs:  # silence pylint269                pass270            raise ScriptHarnessTimeout("foo")271        mock_multiprocessing.Process = raise_error272        self.assertRaises(273            ScriptHarnessFatal, commands.run,274            "echo", halt_on_failure=True275        )276    @mock.patch('scriptharness.commands.multiprocessing')277    def test_no_halt(self, mock_multiprocessing):278        """test_commands | run() halt_on_error=False279        """280        def raise_error(*args, **kwargs):281            """raise ScriptHarnessTimeout"""282            if args or kwargs:  # silence pylint283                pass284            raise ScriptHarnessTimeout("foo")285        mock_multiprocessing.Process = raise_error286        cmd = commands.run("echo", halt_on_failure=False)287        self.assertEqual(cmd.history['status'], status.TIMEOUT)288# TestParsedCommand {{{1289class TestParsedCommand(unittest.TestCase):290    """ParsedCommand()291    """292    def test_parser_kwarg(self):293        """test_commands | ParsedCommand() parser kwarg294        """295        error_list = ErrorList([296            {'substr': 'ell', 'level': logging.WARNING}297        ])298        logger = LoggerReplacement()299        parser = log.OutputParser(error_list, logger=logger)300        cmd = get_parsed_command(parser=parser)301        cmd.run()302        self.assertEqual(parser.history['num_warnings'], 1)303        pprint.pprint(logger.all_messages)304        self.assertEqual(305            logger.all_messages,306            [(logging.WARNING, ' hello', ())]307        )308    def test_bad_errorlist(self):309        """test_commands | ParsedCommand bad error_list310        """311        for error_list in (None, {'substr': 'asdf', 'level': logging.ERROR}):312            self.assertRaises(313                ScriptHarnessException, get_parsed_command,314                error_list315            )316    def test_parse(self):317        """test_commands | parse()318        """319        error_list = ErrorList([320            {'substr': 'ell', 'level': logging.WARNING}321        ])322        logger = LoggerReplacement()323        parser = log.OutputParser(error_list, logger=logger)324        cmd = commands.parse(TEST_COMMAND, parser=parser)325        self.assertTrue(isinstance(cmd, commands.ParsedCommand))326        self.assertEqual(parser.history['num_warnings'], 1)327        pprint.pprint(logger.all_messages)328        self.assertEqual(329            logger.all_messages,330            [(logging.WARNING, ' hello', ())]331        )332# Output {{{1333class TestOutput(unittest.TestCase):334    """Test Output()335    """336    def test_env_output(self):337        """test_commands | Output run env338        """339        cmd = [340            sys.executable, "-c",341            'from __future__ import print_function;import os;'342            'print(os.environ["foo"]);'343        ]344        with get_output(command=cmd, env={'foo': 'bar'}) as command:345            command.run()346            with open(command.stdout.name) as filehandle:347                output = filehandle.read()348            self.assertEqual(to_unicode(output).rstrip(), "bar")349            self.assertEqual(os.path.getsize(command.stderr.name), 0)350            # This will result in cleanup() being called twice;351            # idempotence test352            command.cleanup()353    def test_output_timeout(self):354        """test_commands | Output output_timeout355        """356        for cmdln in get_timeout_cmdlns():357            now = time.time()358            with get_output(command=cmdln, output_timeout=.5) as command:359                print(cmdln)360                self.assertRaises(ScriptHarnessTimeout, command.run)361                self.assertTrue(now + 1 > time.time())362    def test_timeout(self):363        """test_commands | Output timeout364        """365        for cmdln in get_timeout_cmdlns():366            now = time.time()367            with get_output(command=cmdln, timeout=.5) as command:368                print(cmdln)369                self.assertRaises(ScriptHarnessTimeout, command.run)370                self.assertTrue(now + 1 > time.time())371    def test_get_output_exception(self):372        """test_commands | Output.get_output() exception373        """374        with get_output() as command:375            self.assertRaises(376                ScriptHarnessException, command.get_output,377                handle_name="foo"378            )379    def test_get_output_binary(self):380        """test_commands | Output.get_output()381        """382        with get_output() as command:383            command.run()384            self.assertEqual(command.get_output(), "hello")385            self.assertEqual(386                to_unicode(command.get_output(text=False)).rstrip(),387                "hello"388            )389    def test_nonexistent_command(self):390        """test_commands | Output nonexistent command391        """392        with get_output(command=["this_command_should_not_exist"]) as command:393            self.assertRaises(ScriptHarnessError, command.run)394# TestGetOutput {{{1395class TestGetOutput(unittest.TestCase):396    """test commands.get_output() and get_text_output()397    The ScriptHarnessFatal tests are testing get_text_output() because398    it's trickier to test the contextmanager method directly.399    """400    @mock.patch('scriptharness.commands.subprocess')401    def test_error(self, mock_subprocess):402        """test_commands | get_text_output() error403        """404        def raise_error(*args, **kwargs):405            """raise ScriptHarnessError"""406            if args or kwargs:  # silence pylint407                pass408            raise ScriptHarnessError("foo")409        mock_subprocess.Popen = raise_error410        self.assertRaises(411            ScriptHarnessFatal, commands.get_text_output,412            "echo", halt_on_failure=True413        )414    @mock.patch('scriptharness.commands.subprocess')415    def test_timeout(self, mock_subprocess):416        """test_commands | get_text_output() timeout417        """418        def raise_error(*args, **kwargs):419            """raise ScriptHarnessTimeout"""420            if args or kwargs:  # silence pylint421                pass422            raise ScriptHarnessTimeout("foo")423        mock_subprocess.Popen = raise_error424        self.assertRaises(425            ScriptHarnessFatal, commands.get_text_output,426            "echo", halt_on_failure=True427        )428    @mock.patch('scriptharness.commands.subprocess')429    def test_no_halt(self, mock_subprocess):430        """test_commands | get_output() halt_on_error=False431        """432        def raise_error(*args, **kwargs):433            """raise ScriptHarnessTimeout"""434            if args or kwargs:  # silence pylint435                pass436            raise ScriptHarnessTimeout("foo")437        mock_subprocess.Popen = raise_error438        with commands.get_output("echo", halt_on_failure=False) as cmd:439            self.assertEqual(cmd.history['status'], status.TIMEOUT)440    def test_get_text_output(self):441        """test_commands | get_text_output()442        """443        output = commands.get_text_output(TEST_COMMAND)...test_core.py
Source:test_core.py  
1import unittest2from core import logic3class CoreTest(unittest.TestCase):4    default_test_command = {5        "doge_command": {6            "commands": [7                "doge"8            ]9        }10    }11    def test_parse_command_and_params(self):12        test_message = "doge param"13        test_commands = self.default_test_command14        command, params = logic.parse_message(test_commands, test_message)15        self.assertEqual("doge_command", command)16        self.assertEqual("param", params)17    def test_parse_command_and_uppercase_params(self):18        test_message = "doge Param"19        test_commands = self.default_test_command20        command, params = logic.parse_message(test_commands, test_message)21        self.assertEqual("doge_command", command)22        self.assertEqual("Param", params)23    def test_parse_uppercase_command_and_params(self):24        test_message = "DoGe param"25        test_commands = self.default_test_command26        command, params = logic.parse_message(test_commands, test_message)27        self.assertEqual("doge_command", command)28        self.assertEqual("param", params)29    def test_parse_command_without_params(self):30        test_message = "doge"31        test_commands = self.default_test_command32        command, params = logic.parse_message(test_commands, test_message)33        self.assertEqual("doge_command", command)34        self.assertEqual("", params)35    def test_parse_nonexistent_command(self):36        test_message = "noone"37        test_commands = self.default_test_command38        command, params = logic.parse_message(test_commands, test_message)39        self.assertEqual(None, command)...case.py
Source:case.py  
1import unittest2from dataunit.context import Context, get_global_context3class DataUnitTestCase(unittest.TestCase):4    """A class defining a single DataUnit tests case.5    This class is designed to be instantiated with a6    list of TestCommand instances which define the7    behavior of this tests case.8    :note: This class will be executed as a test case by PyCharm. It should pass9    due to an empty test_command list default.10    :param test_commands: List of TestCommand instances used to execute11        the tests case.12    """13    # noinspection PyPep8Naming14    def __init__(self, methodName='runTest', test_commands: list=[], global_context: Context=None):15        # Validate Params16        if test_commands is None:17            raise ValueError('Parameter test_commands must not be None.')18        for command in test_commands:19            if not hasattr(command, 'run'):20                raise ValueError('Parameter test_commands must be list of runnable objects')21        # Call unittest.TestCase.__init__() to setup default behavior.22        super().__init__(methodName)23        # Set attributes on self.24        self.test_commands = test_commands25        self.global_context = global_context26    def runTest(self):27        """Execute the actual tests case28        """29        test_context = Context(parent=self.global_context)30        for cmd in self.test_commands:...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
