Best Python code snippet using stestr_python
test_binaries.py
Source:test_binaries.py  
...14    This is anything in the client/blackhat/bin folder (not including files in the installable folder)15    """16    def setUp(self) -> None:17        self.computer = init()18    def run_command(self, command, args=None):19        if args is None:20            args = []21        result = self.computer.run_command(command, args, True).data22        # This avoids "None type as no attribute strip"23        if not result:24            result = ""25        return result.strip("\n")26    def test_add_user(self):27        self.run_command("adduser", ["--version"])28        self.run_command("adduser", ["--help"])29        fail_bc_not_root_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)30        expected_fail_bc_not_root_result = Result(success=False, message=ResultMessages.NOT_ALLOWED,31                                                  data="adduser: Only root can add new users!\n")32        self.assertEqual(fail_bc_not_root_result, expected_fail_bc_not_root_result)33        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))34        create_new_user_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)35        expected_create_new_user_result = Result(success=True, message=None, data=None)36        create_duplicate_user_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)37        expected_create_duplicate_user_result = Result(success=False, message=ResultMessages.ALREADY_EXISTS,38                                                       data="adduser: The user 'testuser' already exists.\n")39        self.assertEqual(create_new_user_result, expected_create_new_user_result)40        self.assertEqual(create_duplicate_user_result, expected_create_duplicate_user_result)41        # Now confirm that the user was actually added42        get_user_result = self.computer.get_user(username="testuser")43        expected_get_user_result = Result(success=True, message=None,44                                          data=User(1001, "testuser", md5("password".encode()).hexdigest()))45        self.assertEqual(get_user_result, expected_get_user_result)46    def test_base32(self):47        self.run_command("base32", ["--version"])48        self.run_command("base32", ["--help"])49        message = "Hello!"50        self.run_command("touch", ["file"])51        # We're writing with a new line to replicate what `echo hello! > file` would do52        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)53        base32_result = self.run_command("base32", ["file"]).split(" ")[1].strip("\n")54        self.assertEqual(b32decode(base32_result).decode().strip("\n"), message)55    def test_base64(self):56        self.run_command("base64", ["--version"])57        self.run_command("base64", ["--help"])58        message = "Hello!"59        self.run_command("touch", ["file"])60        # We're writing with a new line to replicate what `echo hello! > file` would do61        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)62        base64_result = self.run_command("base64", ["file"]).split(" ")[1]63        self.assertEqual(b64decode(base64_result).decode().strip("\n"), message)64    def test_cat(self):65        self.run_command("cat", ["--version"])66        self.run_command("cat", ["--help"])67        message = "Hello World!"68        self.run_command("touch", ["file"])69        self.computer.sys_write("/home/steve/file", message)70        cat_result = self.run_command("cat", ["file"])71        self.assertEqual(message, cat_result)72        # cat on directory should fail73        result = self.computer.run_command("cat", ["/home/steve"], True)74        self.assertIn("Is a directory", result.data)75        # cat respects permission76        result = self.computer.run_command("cat", ["/etc/shadow"], True)77        self.assertIn("Permission denied", result.data)78    def test_cd(self):79        self.run_command("cd", ["--version"])80        self.run_command("cd", ["--help"])81        self.assertEqual(self.computer.sys_getcwd().pwd(), "/home/steve")82        self.run_command("cd", [".."])83        self.assertEqual(self.computer.sys_getcwd().pwd(), "/home")84        self.run_command("cd", ["..."])85        self.assertEqual(self.computer.sys_getcwd().pwd(), "/")86        self.run_command("cd", ["/etc/skel/Desktop"])87        self.assertEqual(self.computer.sys_getcwd().pwd(), "/etc/skel/Desktop")88        self.run_command("cd", ["~"])89        self.assertEqual(self.computer.sys_getcwd().pwd(), "/home/steve")90    def test_chown(self):91        self.run_command("chown", ["--version"])92        self.run_command("chown", ["--help"])93        self.run_command("touch", ["testfile"])94        self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.owner, 1000)95        self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.group_owner, 1000)96        self.run_command("chown", ["root:root", "testfile"])97        self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.owner, 0)98        self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.group_owner, 0)99    def test_clear(self):100        self.run_command("clear", ["--version"])101        self.run_command("clear", ["--help"])102        self.run_command("clear")103    def test_commands(self):104        self.run_command("commands", ["--version"])105        self.run_command("commands", ["--help"])106        commands_result = self.run_command("commands")107        self.assertIn("ls", commands_result)108        # Now lets remove ls from /bin (it shouldn't show up in commands)109        # We need root permissions110        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))111        move_result = self.computer.run_command("mv", ["/bin/ls", "/tmp"], True)112        self.assertTrue(move_result.success)113        commands_result = self.run_command("commands")114        self.assertNotIn("ls", commands_result)115    def test_date(self):116        self.run_command("date", ["--version"])117        self.run_command("date", ["--help"])118        date_result = self.run_command("date")119        local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo120        time_first = datetime.datetime.now().strftime('%a %b %d %I:%M:%S %p')121        time_second = datetime.datetime.now().strftime('%Y')122        expected_result = f"{time_first} {local_timezone} {time_second}"123        self.assertEqual(date_result, expected_result)124    def test_echo(self):125        self.run_command("echo", ["--version"])126        self.run_command("echo", ["--help"])127        echo_result = self.computer.run_command("echo", ["hello!"], True).data128        self.assertEqual(echo_result, "hello!\n")129        # Test -e flag130        echo_e_flag_result = self.computer.run_command("echo", ["-e", "hello\n\tworld!"], True).data131        self.assertEqual(echo_e_flag_result, "hello\n\tworld!\n")132        # Test -n flag133        echo_n_flag_result = self.computer.run_command("echo", ["-n", "hello world!\n"], True).data134        self.assertEqual(echo_n_flag_result, "hello world!\n")135    def test_env(self):136        self.run_command("env", ["--version"])137        self.run_command("env", ["--help"])138        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("env"))139    def test_export(self):140        self.run_command("export", ["--version"])141        self.run_command("export", ["--help"])142        # Lets get a baseline for the env before-hand143        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))144        self.run_command("export", ["BASH=/bin/bash"])145        env_result = self.run_command("printenv")146        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve\nBASH=/bin/bash", env_result)147    def test_head(self):148        self.run_command("head", ["--version"])149        self.run_command("head", ["--help"])150        message = "one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten\neleven\ntwelve\nthirteen\nfourteen\nfifteen\nsixteen\nseventeen\neighteen\nninteen\ntwenty"151        # Make a file with 20 lines152        self.run_command("touch", ["bigfile"])153        self.computer.sys_write("/home/steve/bigfile", message)154        expected_result = "\n".join(message.split("\n")[:-10])155        actual_result = self.run_command("head", ["bigfile"])156        self.assertEqual(expected_result, actual_result)157        # TODO: Test --wrap flag158    def test_hostname(self):159        self.run_command("hostname", ["--version"])160        self.run_command("hostname", ["--help"])161        # We want to try to set the hostname without root permission (should fail)162        set_hostname_result = self.computer.run_command("hostname", ["localhost"], True)163        expected_set_hostname_result = Result(success=False,164                                              data="hostname: you must be root to change the host name")165        set_hostname_result.data = set_hostname_result.data.strip("\n")166        self.assertEqual(set_hostname_result, expected_set_hostname_result)167        # Make sure it failed168        get_hostname_result = self.run_command("hostname")169        self.assertNotEqual(get_hostname_result, "localhost")170        # Switch to root then change hostname171        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))172        self.run_command("hostname", ["localhost"])173        get_hostname_result = self.run_command("hostname")174        self.assertEqual(get_hostname_result, "localhost")175    def test_id(self):176        self.run_command("id", ["--version"])177        self.run_command("id", ["--help"])178        self_id_result = self.run_command("id")179        expected_self_id_result = "uid=1000(steve) gid=1000(steve) "180        self.assertEqual(self_id_result, expected_self_id_result)181        root_id_result = self.run_command("id", ["root"])182        root_id_by_uid_result = self.run_command("id", ["0"])183        expected_root_result = "uid=0(root) gid=0(root) "184        self.assertEqual(root_id_result, expected_root_result)185        self.assertEqual(root_id_by_uid_result, expected_root_result)186        # Check the flags187        id_result = self.run_command("id", ["-g"])188        self.assertEqual(id_result, "1000")189        id_result = self.run_command("id", ["-G"])190        self.assertEqual(id_result, "1000")191        id_result = self.run_command("id", ["-n"])192        self.assertEqual(id_result, "steve")193    def test_ls(self):194        import re195        self.run_command("ls", ["--version"])196        self.run_command("ls", ["--help"])197        # In the user's home directory, The files should be: Desktop Documents Downloads Music Pictures Public Templates Videos198        ls_result = self.computer.run_command("ls", ["--no-color"], True)199        expected_ls_result = Result(success=True, message=None,200                                    data="Desktop Documents Downloads Music Pictures Public Templates Videos ")201        ls_result.data = ls_result.data.strip("\n")202        color_filter = re.compile(r'\x1b[^m]*m')203        stripped_color = color_filter.sub('', ls_result.data)204        self.assertEqual(stripped_color, expected_ls_result.data)205        # Make sure .shellrc isn't there (as -a isn't specified)206        self.assertNotIn(".shellrc", ls_result.data)207        # Test -a flag (show hidden files)208        ls_result = self.run_command("ls", ["--no-color", "-a"])209        self.assertIn(".shellrc", ls_result)210        # Make sure -l flag works211        ls_result = self.run_command("ls", ["--no-color", "-l"])212        self.assertIn("steve", ls_result)213    def test_md5sum(self):214        self.run_command("md5sum", ["--version"])215        self.run_command("md5sum", ["--help"])216        message = "hello!"217        self.run_command("touch", ["file"])218        # We're writing with a new line to replicate what `echo hello! > file` would do219        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)220        self.assertIn(md5((message + "\n").encode()).hexdigest(), self.run_command("md5sum", ["file"]))221        self.assertIn(md5(message.encode()).hexdigest(), self.run_command("md5sum", ["file", "-z"]))222        self.assertEqual(self.run_command("md5sum", ["file", "-z", "--tag"]),223                         f"MD5 (file) = {md5(message.encode()).hexdigest()}")224    def test_mv(self):225        self.run_command("mv", ["--version"])226        self.run_command("mv", ["--help"])227        # Make sure /bin/pwd exists228        find_pwd = self.computer.fs.find("/bin/pwd")229        self.assertTrue(find_pwd.success)230        # Make sure permissions work when moving231        self.assertFalse(self.computer.run_command("mv", ["/bin/pwd", "/tmp"], True).success)232        # Make sure /bin/pwd still exists233        find_pwd = self.computer.fs.find("/bin/pwd")234        self.assertTrue(find_pwd.success)235        # Make sure /tmp/pwd DOESN'T exists236        find_pwd = self.computer.fs.find("/tmp/pwd")237        self.assertFalse(find_pwd.success)238        # Root perms!239        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))240        self.run_command("mv", ["/bin/pwd", "/tmp"])241        # Make sure /bin/pwd DOESN'T exists242        find_pwd = self.computer.fs.find("/bin/pwd")243        self.assertFalse(find_pwd.success)244        # Make sure /tmp/pwd exists245        find_pwd = self.computer.fs.find("/tmp/pwd")246        self.assertTrue(find_pwd.success)247    def test_passwd(self):248        self.run_command("passwd", ["--version"])249        self.run_command("passwd", ["--help"])250        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))251        self.computer.run_command("adduser", ["testuser", "-p" "tester", "-n"], True)252        self.computer.sessions.pop()253        # su to sanity check password254        with unittest.mock.patch("getpass.getpass", return_value="tester"):255            with unittest.mock.patch("getpass.fallback_getpass", return_value="tester"):256                self.run_command("su", ["testuser"])257                self.assertEqual("testuser", self.run_command("whoami"))258                self.computer.sessions.pop()259                # Change testuser's password260                print(self.run_command("passwd", ["testuser", "-p", "newpassword"]))261                # Confirm the old password doesn't work262                result = self.run_command("su", ["testuser"])263                self.assertIn("Authentication failure", result)264                # A valid password is required to reset the user's password265                result = self.run_command("passwd", ["testuser", "-p", "newnewpassword"])266                self.assertIn("Authentication token manipulation error", result)267        # Confirm the new password works268        with unittest.mock.patch("getpass.getpass", return_value="newpassword"):269            with unittest.mock.patch("getpass.fallback_getpass", return_value="newpassword"):270                self.run_command("su", ["testuser"])271                self.assertEqual("testuser", self.run_command("whoami"))272                self.computer.sessions.pop()273    def test_printenv(self):274        self.run_command("printenv", ["--version"])275        self.run_command("printenv", ["--help"])276        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))277    def test_pwd(self):278        self.run_command("pwd", ["--version"])279        self.run_command("pwd", ["--help"])280        pwd_result = self.run_command("pwd")281        self.assertEqual("/home/steve", pwd_result)282    def test_rm(self):283        self.run_command("rm", ["--version"])284        self.run_command("rm", ["--help"])285        # Make some test files to remove286        self.run_command("touch", ["file1", "file2", "file3", "/tmp/file4"])287        ls_result = self.run_command("ls")288        # Sanity check289        for file in ["file1", "file2", "file3"]:290            self.assertIn(file, ls_result)291        # rm a single file (relative path)292        self.run_command("rm", ["file1"])293        ls_result = self.run_command("ls")294        self.assertNotIn("file1", ls_result)295        # rm a file (absolute path)296        self.run_command("rm", ["/tmp/file4"])297        ls_result = self.run_command("ls", ["/tmp"])298        self.assertNotIn("file4", ls_result)299        # TODO: Allow multiple files to be deleted and then add test300        # Delete non-existent file301        rm_doesnt_exist = self.computer.run_command("rm", ["/tmp/badfile"], True)302        self.assertFalse(rm_doesnt_exist.success)303        self.assertIn("No such file or directory", rm_doesnt_exist.data)304        # No rm permissions305        rm_no_perms = self.computer.run_command("rm", ["/etc/passwd"], False)306        self.assertFalse(rm_no_perms.success)307        self.assertIn("Permission denied", rm_no_perms.data)308        # Confirm it's still there309        self.assertTrue(self.computer.run_command("stat", ["/etc/passwd"], True).success)310        # Can't remove populated directory311        self.run_command("mkdir", ["files"])312        self.run_command("cd", ["files"])313        self.run_command("touch", ["file1", "file2"])314        self.run_command("cd", [".."])315        rm_full_dir = self.computer.run_command("rm", ["files"], True)316        self.assertFalse(rm_full_dir.success)317        self.assertIn("Is a directory", rm_full_dir.data)318        # Confirm it's still there319        self.assertTrue(self.computer.run_command("stat", ["files"], True).success)320        # Remove directory with -r321        rm_dir_with_flag = self.computer.run_command("rm", ["-r", "files"], True)322        self.assertTrue(rm_dir_with_flag.success)323        # Confirm324        self.assertFalse(self.computer.run_command("stat", ["files"], True).success)325        # rm *326        self.run_command("mkdir", ["files"])327        self.run_command("cd", ["files"])328        self.run_command("touch", ["file1", "file2"])329        rm_star = self.computer.run_command("rm", ["*"], True)330        self.assertTrue(rm_star.success)331        ls_result = self.run_command("ls")332        self.assertNotIn("file1", ls_result)333        self.assertNotIn("file2", ls_result)334    def test_sha1sum(self):335        self.run_command("sha1sum", ["--version"])336        self.run_command("sha1sum", ["--help"])337        message = "hello!"338        self.run_command("touch", ["file"])339        # We're writing with a new line to replicate what `echo hello! > file` would do340        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)341        self.assertIn(sha1((message + "\n").encode()).hexdigest(), self.run_command("sha1sum", ["file"]))342        self.assertIn(sha1(message.encode()).hexdigest(), self.run_command("sha1sum", ["file", "-z"]))343        self.assertEqual(self.run_command("sha1sum", ["file", "-z", "--tag"]),344                         f"SHA1 (file) = {sha1(message.encode()).hexdigest()}")345    def test_sha224sum(self):346        self.run_command("sha224sum", ["--version"])347        self.run_command("sha224sum", ["--help"])348        message = "hello!"349        self.run_command("touch", ["file"])350        # We're writing with a new line to replicate what `echo hello! > file` would do351        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)352        self.assertIn(sha224((message + "\n").encode()).hexdigest(),353                      self.run_command("sha224sum", ["file"]))354        self.assertIn(sha224(message.encode()).hexdigest(),355                      self.run_command("sha224sum", ["file", "-z"]))356        self.assertEqual(self.run_command("sha224sum", ["file", "-z", "--tag"]),357                         f"SHA224 (file) = {sha224(message.encode()).hexdigest()}")358    def test_sha256sum(self):359        self.run_command("sha256sum", ["--version"])360        self.run_command("sha256sum", ["--help"])361        message = "hello!"362        self.run_command("touch", ["file"])363        # We're writing with a new line to replicate what `echo hello! > file` would do364        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)365        self.assertIn(sha256((message + "\n").encode()).hexdigest(),366                      self.run_command("sha256sum", ["file"]))367        self.assertIn(sha256(message.encode()).hexdigest(),368                      self.run_command("sha256sum", ["file", "-z"]))369        self.assertEqual(self.run_command("sha256sum", ["file", "-z", "--tag"]),370                         f"SHA256 (file) = {sha256(message.encode()).hexdigest()}")371    def test_sha384sum(self):372        self.run_command("sha384sum", ["--version"])373        self.run_command("sha384sum", ["--help"])374        message = "hello!"375        self.run_command("touch", ["file"])376        # We're writing with a new line to replicate what `echo hello! > file` would do377        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)378        self.assertIn(sha384((message + "\n").encode()).hexdigest(),379                      self.run_command("sha384sum", ["file"]))380        self.assertIn(sha384(message.encode()).hexdigest(),381                      self.run_command("sha384sum", ["file", "-z"]))382        self.assertEqual(self.run_command("sha384sum", ["file", "-z", "--tag"]),383                         f"SHA384 (file) = {sha384(message.encode()).hexdigest()}")384    def test_sha512sum(self):385        self.run_command("sha512sum", ["--version"])386        self.run_command("sha512sum", ["--help"])387        message = "hello!"388        self.run_command("touch", ["file"])389        # We're writing with a new line to replicate what `echo hello! > file` would do390        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)391        self.assertIn(sha512((message + "\n").encode()).hexdigest(),392                      self.run_command("sha512sum", ["file"]))393        self.assertIn(sha512(message.encode()).hexdigest(),394                      self.run_command("sha512sum", ["file", "-z"]))395        self.assertEqual(self.run_command("sha512sum", ["file", "-z", "--tag"]),396                         f"SHA512 (file) = {sha512(message.encode()).hexdigest()}")397    def test_su(self):398        self.run_command("su", ["--version"])399        self.run_command("su", ["--help"])400        self.run_command("su", ["--version"])401        # Sanity check402        self.assertIn("uid=1000(steve) gid=1000(steve)", self.run_command("id"))403        with unittest.mock.patch("getpass.getpass", return_value="password"):404            with unittest.mock.patch("getpass.fallback_getpass", return_value="password"):405                self.run_command("su")406                self.assertIn("uid=0(root) gid=0(root)", self.run_command("id"))407        self.assertTrue(self.computer.run_command("adduser", ["testuser", "-p" "NOTPASSWORD", "-n"], True).success)408        # Now that we're root, we should be able to change user without a password409        self.run_command("su", ["testuser"])410        self.assertIn("uid=1001(testuser) gid=1001(testuser)", self.run_command("id"))411    def test_sudo(self):412        self.run_command("sudo", ["--version"])413        self.run_command("sudo", ["--version"])414        # Sanity check415        sudo_result = self.run_command("id")416        self.assertIn("uid=1000(steve) gid=1000(steve)", sudo_result)417        # Add a new user to test later418        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))419        self.run_command("adduser", ["testuser", "-p" "password", "-n"])420        self.computer.sessions.pop()421        # We do both "getpass" and "fallback_getpass" because some terminals422        # (pycharm's one for example) uses the fallback function423        with unittest.mock.patch("getpass.getpass", return_value="password"):424            with unittest.mock.patch("getpass.fallback_getpass", return_value="password"):425                sudo_root_result = self.run_command("sudo", ["id"])426                self.assertIn("uid=0(root) gid=0(root)", sudo_root_result)427                other_user_result = self.run_command("sudo", ["-u", "testuser", "id"])428                self.assertIn("uid=1001(testuser) gid=1001(testuser)", other_user_result)429    def test_tail(self):430        self.run_command("tail", ["--version"])431        self.run_command("tail", ["--help"])432        message = "one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten\neleven\ntwelve\nthirteen\nfourteen\nfifteen\nsixteen\nseventeen\neighteen\nninteen\ntwenty"433        # Make a file with 20 lines434        self.run_command("touch", ["bigfile"])435        self.computer.sys_write("/home/steve/bigfile", message)436        expected_result = "\n".join(message.split("\n")[10:])437        actual_result = self.run_command("tail", ["bigfile"])438        self.assertEqual(expected_result, actual_result)439        # TODO: Test --wrap flag440    def test_touch(self):441        self.run_command("touch", ["--version"])442        self.run_command("touch", ["--help"])443        touch_result = self.computer.run_command("touch", ["testfile"], True)444        expected_touch_result = Result(success=True)445        # We should double-check that the file exists in our home directory446        self.assertIn("testfile", self.run_command("ls"))447        self.assertEqual(touch_result, expected_touch_result)448    def test_uname(self):449        self.run_command("uname", ["--version"])450        self.run_command("uname", ["--help"])451        # Test some misc uname configurations452        self.assertEqual(self.run_command("uname"), "Linux")453        self.assertEqual(self.run_command("uname", ["-a"]),454                         f"Linux {self.computer.hostname} 1.1 v1 x86_64 Blackhat/Linux")455        self.assertEqual(self.run_command("uname", ["-mns"]),456                         f"Linux {self.computer.hostname} x86_64 ")457        self.assertEqual(self.run_command("uname", ["-op", "-i"]),458                         "unknown unknown Blackhat/Linux ")459    def test_unset(self):460        self.run_command("unset", ["--version"])461        self.run_command("unset", ["--help"])462        # Lets get a baseline for the env before-hand463        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))464        self.run_command("export", ["BASH=/bin/bash"])465        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve\nBASH=/bin/bash",466                         self.run_command("printenv"))467        self.run_command("unset", ["BASH"])468        self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))469    def test_uptime(self):470        self.run_command("uptime", ["--version"])471        self.run_command("uptime", ["--help"])472        # Make sure it doesn't crash473        uptime_result = self.run_command("uptime")474        # TODO: See if there's a better way to fix this without wasting 5 seconds475        self.assertEqual(uptime_result, "uptime: 0:00:00")476        sleep(5)477        uptime_result = self.run_command("uptime")478        self.assertEqual(uptime_result, "uptime: 0:00:05")479        uptime_result = self.run_command("uptime", ["-p"])480        self.assertEqual(uptime_result, "up 0 minutes")481        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")482        uptime_result = self.run_command("uptime", ["-s"])483        # I'm going to strip out the seconds because different speed test runners can take different amount of times484        # to run485        now = ":".join(now.split(":")[:-1])486        uptime_result = ":".join(uptime_result.split(":")[:-1])487        # We have to check +1 or -1 minute because there is a possibility that it can be one minute behind488        minus_one_minute = uptime_result.split(":")489        minus_one_minute[1] = "{:02d}".format(int(minus_one_minute[1]) - 1)490        minus_one_minute = ":".join(minus_one_minute)491        plus_one_minute = uptime_result.split(":")492        plus_one_minute[1] = "{:02d}".format(int(plus_one_minute[1]) + 1)493        plus_one_minute = ":".join(plus_one_minute)494        self.assertIn(now, [minus_one_minute, uptime_result, plus_one_minute])495    def test_users(self):496        self.run_command("users", ["--version"])497        self.run_command("users", ["--help"])498        # One session = only one user499        users_result = self.run_command("users")500        self.assertEqual(users_result, "steve ")501        # Add new sessions then test502        self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))503        users_result = self.run_command("users")504        self.assertEqual(users_result, "steve root ")505        self.computer.sessions.append(Session(1000, self.computer.fs.files, self.computer.sessions[-1].id + 1))506        users_result = self.run_command("users")507        self.assertEqual(users_result, "steve root steve ")508    def test_wc(self):509        self.run_command("wc", ["--version"])510        self.run_command("wc", ["--help"])511        message = "hello!"512        message_len = len(message + "\n")513        self.run_command("touch", ["file"])514        # We're writing with a new line to replicate what `echo hello! > file` would do515        self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)516        wc_result = self.run_command("wc", ["file"])517        wc_split_results = wc_result.split(" ")518        while "" in wc_split_results:519            wc_split_results.remove("")520        total_new_line_count, total_word_count, total_byte_count = wc_split_results[0], wc_split_results[1], \521                                                                   wc_split_results[2]522        self.assertEqual(int(total_byte_count), message_len)523        self.assertEqual(int(total_new_line_count), 1)524        self.assertEqual(int(total_word_count), 1)525        wc_result = self.run_command("wc", ["file", "-c"])526        wc_split_results = wc_result.split(" ")527        while "" in wc_split_results:528            wc_split_results.remove("")529        total_byte_count = wc_split_results[0]530        self.assertEqual(int(total_byte_count), message_len)531        wc_result = self.run_command("wc", ["file", "-m"])532        wc_split_results = wc_result.split(" ")533        while "" in wc_split_results:534            wc_split_results.remove("")535        total_byte_count = wc_split_results[0]536        self.assertEqual(int(total_byte_count), message_len)537    def test_who(self):538        self.run_command("who", ["--version"])539        self.run_command("who", ["--help"])540        self.assertEqual(self.run_command("who"), "steve\tpts/0")541    def test_whoami(self):542        self.run_command("whoami", ["--version"])543        self.run_command("whoami", ["--help"])544        self.assertEqual(self.run_command("whoami"), "steve")545class TestInstallableBinaries(unittest.TestCase):546    """547    These test cases test the functionality of the binaries that are NOT included with the system548    (anything installable through apt)549    This is anything in the client/blackhat/bin/installable folder550    """551    def setUp(self) -> None:552        self.computer = init()553    def run_command(self, command, args=None):554        if args is None:555            args = []556        result = self.computer.run_command(command, args, True).data557        # This avoids "None type as no attribute strip"558        if not result:559            result = ""...setup.py
Source:setup.py  
...6        # log_msg(f"###### Setup iSCSI target {id}")7        # log_msg(ISCSI_TARGET_IMGS)8        TGT_BS=ISCSI_TARGET_IMGS[id]9        ISCSI_TARGET_NAME=ISCSI_TARGET_NAMES[id]10        run_command(f"mkdir -p {IMG_DIR}")11        run_command(f"fallocate -l {DUMMY_DISK_SIZE} {TGT_BS}")12        run_command(f"sudo {PFE_USR_DIR}/tgtadm  --lld iscsi --op new --mode target --tid {id} -T {ISCSI_TARGET_NAME}")13        run_command(f"sudo {PFE_USR_DIR}/tgtadm  --lld iscsi --op new --mode logicalunit --tid {id} --lun 1 -b {TGT_BS}")14        run_command(f"sudo {PFE_USR_DIR}/tgtadm   --lld iscsi --op bind --mode target --tid {id} -I ALL")15    # run_command(f"sudo {PFE_USR_DIR}/tgtadm   --lld iscsi --op show --mode target")16def set_initiator(ids):17    log_msg(f" ==== Starting iSCSI initiator ====", emphasize=True)18    run_command(f"sudo service open-iscsi stop && sudo service open-iscsi start", suppress_output=True)19    run_command(f"sudo iscsiadm --mode discovery --type sendtargets --portal {ISCSI_TARGET_IP}")20    for id in ids:21        ISCSI_TARGET_NAME=ISCSI_TARGET_NAMES[id]22        run_command(f"sudo iscsiadm --mode node --targetname {ISCSI_TARGET_NAME} --portal {ISCSI_TARGET_IP}:3260 --login")23def run_tgtd():24    log_msg(f" ==== Starting iSCSI daemon ====", emphasize=True)25    run_command(f"mkdir -p {PFE_LOG_DIR}")26    run_command(f"sudo rm {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")27    run_command(f"touch {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")28    run_command(f"sudo chmod 666 {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")29    # Run tgtd30    run_command(f"sudo {PFE_USR_DIR}/tgtd -f --pfe-io-header-log {IO_HEAD_LOG} --pfe-fail-type-tgtd 0 --pfe-err-blk {ERR_BLOCK_LOG} --pfe-io-data-log {IO_DATA_LOG} --pfe-enable-record 0 >  {TGTD_LOG} 2>&1 &")31    # run_command(f"sudo {PFE_USR_DIR}/tgtd --pfe-io-header-log {IO_HEAD_LOG} --pfe-fail-type-tgtd 0 --pfe-err-blk {ERR_BLOCK_LOG} --pfe-io-data-log {IO_DATA_LOG} --pfe-enable-record 0 >  {TGTD_LOG} 2>&1 ")32def kill_tgtd():33    log_msg(f" ==== Killing iSCSI daemon ====", warning=True)34    run_command("sudo killall -9 tgtd")35def clean_logs():36    log_msg(f" ==== Cleaning log files ====", warning=True)37    #LOGS = [IO_HEAD_LOG, IO_DATA_LOG, IO_HEAD_SPLIT_LOG, ERR_BLOCK_LOG, TGTD_LOG]38    LOGS = [IO_HEAD_LOG, IO_DATA_LOG, IO_HEAD_SPLIT_LOG, ERR_BLOCK_LOG]39    for LOG in LOGS:40        if os.path.exists(LOG):41            run_command(f"cat /dev/null > {LOG}")42def start_log():43    run_command(f"touch {PFE_USR_DIR}/start_log")44def stop_log():45    run_command(f"rm -f {PFE_USR_DIR}/start_log")46def start_replay():47    run_command(f"touch {PFE_USR_DIR}/start_replay")48def stop_replay():49    run_command(f"rm -f {PFE_USR_DIR}/start_replay")50def reset_queues():51    log_msg(f" ==== Resetting message queues ====", warning=True)52    for queue in QUEUES:53        assert(queue != "" and queue != "/")54        run_command(f"rm -f {queue}")55        run_command(f"touch {queue}")56def reset_flags():57    log_msg(f" ==== Resetting flags ====", warning=True)58    run_command(f"rm -f {HARVEST_FLAG}")59    run_command(f"rm -f {END_FLAG}")60def clear_queues():61    log_msg(f" ==== Clearing message queues ====", warning=True)62    run_command(f"sudo ./mqueue_cleaner")63    run_command(f"sudo ./mqueue_cleaner")64def obtain_disk_id(ids):65    disk_ids = []66    raw_output = run_command("sudo lsblk -d", output=True)67    raw_output = raw_output['stdout']68    for line in raw_output.strip().split("\n"):69        line = list(filter(lambda x: len(x) > 0, line.strip().split(" ")))70        if line[3] == DUMMY_DISK_SIZE_STRING:71            disk_ids.append(line[0])72    # FIXME: find a way to restart the script instead of exit73    assert(len(disk_ids) == len(ids))74        75    log_msg("Virtual disk ids", disk_ids)76    return disk_ids77def kill_vms(vms):78    log_msg(f" ==== Killing VMs ====", warning=True)79    for vm in vms:80        run_command(f"sudo virsh shutdown {vm}")81    # check if all vms are shutdown  82    while True:83        raw_output = run_command("sudo virsh list --all --state-shutoff", output=True)84        raw_output = raw_output['stdout']85        all_shutdown = True86        for vm in vms:87            if vm not in raw_output:88                all_shutdown = False89                break90        if not all_shutdown:91            time.sleep(3)92        else:93            break94def start_vms(vms):95    log_msg(f" ==== Starting VMs ====", emphasize=True)96    for vm in vms:97        run_command(f"sudo virsh start {vm}")98    # check if all vms are running99    while True:100        raw_output = run_command("sudo virsh list --all --state-running", output=True)101        raw_output = raw_output['stdout']102        all_running = True103        for vm in vms:104            if vm not in raw_output:105                all_running = False106                break107        if not all_running:108            time.sleep(3)109        else:110            break111    112    # test ssh connection113    log_msg("Waiting for ssh connections (may take a while)")114    for vm in vms:115        while True:116            output = run_command(f"ssh {vm} 'date'", error=True, verbose=False)117            output = output['stderr']118            if "No route to host" in output:119                time.sleep(3)120            else:121                break122def attach_device(disk_ids, vms):123    log_msg(f" ==== Attaching VM devices ====", emphasize=True)124    for i, vm in enumerate(vms):125        run_command(f"sudo virsh attach-disk {vm} /dev/{disk_ids[i]} --target vdb")126def detach_device(vms):127    log_msg(f" ==== Detaching VM disks ====", warning=True)128    raw_output = run_command("sudo virsh list --all --state-running", output=True)129    raw_output = raw_output['stdout']130    for vm in vms:131        if vm not in raw_output:132            continue133        raw_output = run_command(f"ssh {vm} 'ls /dev/vd*'", output=True)134        raw_output = raw_output['stdout']135        ## FIXME: by default we assume the last disk in VM is the virtual disk 136        disk_id = raw_output.strip().split("\n")[-1].split("/")[-1]137        ## we only find the default disk -- no additional virtual disks are attached yet138        if "vda" in disk_id:139            continue140        run_command(f"sudo virsh detach-disk {vm} --target {disk_id}")141def run_workloads(with_vm, vms, workload="terasort", mode="harvest"):142    log_msg(f" ==== Running workload {workload} with {mode} mode ====", emphasize=True)143    if with_vm:144        run_command(f"ssh {vms[0]} 'sudo python3 workload_parser.py {workload} {workload}.out' &")145        if mode == "harvest":146            while not os.path.exists(HARVEST_FLAG):147                time.sleep(5)148            run_command(f"ssh {vms[1]} 'sudo python3 workload_parser.py ycsb ycsb.out ' &")149    else:150        run_command(f"sudo python3 workload.py {workload} {USR_DIR}/logs/{workload}.out &")151        if mode == "harvest":152            while not os.path.exists(HARVEST_FLAG):153                time.sleep(5)154            log_msg(f" ==== Running regular workload ycsb ====", emphasize=True)155            run_command(f"sudo python3 workload.py ycsb {USR_DIR}/logs/ycsb.out &")156def stop_workloads(with_vm, vms):157    log_msg(f" ==== Stopping workloads ====", warning=True)158    if with_vm:159        raw_output = run_command("sudo virsh list --all --state-running", output=True)160        raw_output = raw_output['stdout']161        for vm in vms:162            if vm not in raw_output:163                continue164            run_command("ssh "+vm+" \"ps ax | grep 'python3 workload_parser.py' | awk '{ print \$1 }' | xargs sudo kill -9\" ")165            run_command("ssh "+vm+" \"ps ax | grep 'python3 joblib' | awk '{ print \$1 }' | xargs sudo kill -9\" ")166    else:167        run_command("ps ax | grep 'python3 workload.py' | awk '{ print $1 }' | xargs sudo kill -9")168        run_command("ps ax | grep 'python3 joblib' | awk '{ print $1 }' | xargs sudo kill -9")169def stop_ocssd():170    log_msg(f" ==== Stopping blockflex ====", warning=True)171    run_command("sudo killall harvest")172def run_ocssd(workload="terasort", mode="harvest"):173    log_msg(f" ==== Running blockflex ====", emphasize=True)174    OUT_DIR = f"{USR_DIR}/results/{workload}"175    if not os.path.exists(OUT_DIR):176        os.mkdir(OUT_DIR)177        run_command(f"sudo chown $USER {OUT_DIR}")178    if mode == "static":179        run_command(f"sudo ../blockflex/harvest 0 0 > {OUT_DIR}/no_harvest_bw.out &")180    elif mode == "harvest":181        run_command(f"sudo ../blockflex/harvest 4 4 > {OUT_DIR}/harvest_bw.out &")182    elif mode == "unsold":183        run_command(f"sudo ../blockflex/harvest 0 4 > {OUT_DIR}/unsold_harvest_bw.out &")184def run_predictor():185    log_msg(f" ==== Running predictor ====", emphasize=True)186    run_command(f"echo '0' > {BLOCKFLEX_DIR}/bw_inputs.txt")187    run_command(f"echo '0' > {BLOCKFLEX_DIR}/sz_inputs.txt")188    run_command(f"sudo python3 ../blockflex/pred.py {USR_DIR}/logs/predictor.out &")189def stop_predictor():190    log_msg(f" ==== Stopping predictor ====", warning=True)191    run_command("ps ax | grep 'pred.py' | awk '{ print $1 }' | xargs sudo kill -9")192if __name__ == "__main__":193    ids = [1,2]194    vms = ["vm0", "vm1"]195    VERBOSE = True196    WITH_VM = False197    running_mode = 'regular'198    if len(sys.argv) >= 2:199        running_mode = sys.argv[1]200    stop_predictor()201    stop_workloads(WITH_VM, vms)202    stop_ocssd()203    clear_queues()204    if WITH_VM:205        detach_device(vms)...clear_cursors_carets_last_selection_unit_tests.py
Source:clear_cursors_carets_last_selection_unit_tests.py  
...16class ClearCursorsCaretsLastSelectionUnitTests(utilities.BasicSublimeTextViewTestCase):1718    def test_1_selections_at_first_word(self):19        self.create_test_text(0)20        self.view.window().run_command( "find_under_expand" )21        self.view.window().run_command( "single_selection_last" )2223        region = self.view.sel()[0]24        self.assertEqual( sublime.Region(0, 4), region )2526    def test_2_selections_at_first_word(self):27        self.create_test_text(0)28        self.view.window().run_command( "find_under_expand" )29        self.view.window().run_command( "find_under_expand" )30        self.view.window().run_command( "single_selection_last" )3132        region = self.view.sel()[0]33        self.assertEqual( sublime.Region(5, 9), region )3435    def test_6_selections_at_first_word(self):36        self.create_test_text(0)37        self.view.window().run_command( "find_under_expand" )38        self.view.window().run_command( "find_under_expand" )39        self.view.window().run_command( "find_under_expand" )40        self.view.window().run_command( "find_under_expand" )41        self.view.window().run_command( "find_under_expand" )42        self.view.window().run_command( "find_under_expand" )43        self.view.window().run_command( "single_selection_last" )4445        region = self.view.sel()[0]46        self.assertEqual( sublime.Region(26, 30), region )4748    def test_6_selections_plus_redundant_expand_at_first_word(self):49        self.create_test_text(0)50        self.view.window().run_command( "find_under_expand" )51        self.view.window().run_command( "find_under_expand" )52        self.view.window().run_command( "find_under_expand" )53        self.view.window().run_command( "find_under_expand" )54        self.view.window().run_command( "find_under_expand" )55        self.view.window().run_command( "find_under_expand" )56        self.view.window().run_command( "single_selection_last" )5758        region = self.view.sel()[0]59        self.assertEqual( sublime.Region(26, 30), region )6061    def test_2_selections_at_last_word(self):62        self.create_test_text(26)63        self.view.window().run_command( "find_under_expand" )64        self.view.window().run_command( "find_under_expand" )65        self.view.window().run_command( "single_selection_last" )6667        region = self.view.sel()[0]68        self.assertEqual( sublime.Region(0, 4), region )6970    def test_3_selections_at_last_word(self):71        self.create_test_text(26)72        self.view.window().run_command( "find_under_expand" )73        self.view.window().run_command( "find_under_expand" )74        self.view.window().run_command( "find_under_expand" )75        self.view.window().run_command( "single_selection_last" )7677        region = self.view.sel()[0]78        self.assertEqual( sublime.Region(5, 9), region )7980    def test_6_selections_at_last_word(self):81        self.create_test_text(26)82        self.view.window().run_command( "find_under_expand" )83        self.view.window().run_command( "find_under_expand" )84        self.view.window().run_command( "find_under_expand" )85        self.view.window().run_command( "find_under_expand" )86        self.view.window().run_command( "find_under_expand" )87        self.view.window().run_command( "find_under_expand" )88        self.view.window().run_command( "single_selection_last" )8990        region = self.view.sel()[0]91        self.assertEqual( sublime.Region(21, 25), region )9293    def test_6_selections_at_last_word_plus_redundant_expansion_at_last_word(self):94        self.create_test_text(26)95        self.view.window().run_command( "find_under_expand" )96        self.view.window().run_command( "find_under_expand" )97        self.view.window().run_command( "find_under_expand" )98        self.view.window().run_command( "find_under_expand" )99        self.view.window().run_command( "find_under_expand" )100        self.view.window().run_command( "find_under_expand" )101        self.view.window().run_command( "find_under_expand" )102        self.view.window().run_command( "single_selection_last" )103104        region = self.view.sel()[0]105        self.assertEqual( sublime.Region(21, 25), region )
...set_ro.py
Source:set_ro.py  
...3import subprocess4import sys5logging.basicConfig(level=logging.DEBUG)6log = logging.getLogger()7def run_command(args, except_on_error=True):8    log.debug('running command "%s"', ' '.join(args))9    proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)10    out, err = proc.communicate()11    if out:12        log.debug('stdout: %s', out)13    if err:14        log.debug('stderr: %s', err)15    if proc.returncode:16        log.debug('ret: %d', proc.returncode)17        if except_on_error:18            raise subprocess.CalledProcessError(proc.returncode, ' '.join(args))19    return (proc.returncode, out, err)20def setup(image_name):21    run_command(['rbd', 'create', '-s', '100', image_name])22    run_command(['rbd', 'snap', 'create', image_name + '@snap'])23    run_command(['rbd', 'map', image_name])24    run_command(['rbd', 'map', image_name + '@snap'])25def teardown(image_name, fail_on_error=True):26    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name + '@snap'], fail_on_error)27    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name], fail_on_error)28    run_command(['rbd', 'snap', 'rm', image_name + '@snap'], fail_on_error)29    run_command(['rbd', 'rm', image_name], fail_on_error)30def write(target, expect_fail=False):31    try:32        with open(target, 'w', 0) as f:33            f.write('test')34            f.flush()35        assert not expect_fail, 'writing should have failed'36    except IOError:37        assert expect_fail, 'writing should not have failed'38def test_ro(image_name):39    dev = '/dev/rbd/rbd/' + image_name40    snap_dev = dev + '@snap'41    log.info('basic device is readable')42    write(dev)43    log.info('basic snapshot is read-only')44    write(snap_dev, True)45    log.info('cannot set snapshot rw')46    ret, _, _ = run_command(['blockdev', '--setrw', snap_dev], False)47    assert ret != 0, 'snapshot was set read-write!'48    run_command(['udevadm', 'settle'])49    write(snap_dev, True)50    log.info('set device ro')51    run_command(['blockdev', '--setro', dev])52    run_command(['udevadm', 'settle'])53    write(dev, True)54    log.info('cannot set device rw when in-use')55    with open(dev, 'r') as f:56        ret, _, _ = run_command(['blockdev', '--setro', dev], False)57        assert ret != 0, 'in-use device was set read-only!'58        run_command(['udevadm', 'settle'])59    write(dev, True)60    run_command(['blockdev', '--setro', dev])61    run_command(['udevadm', 'settle'])62    write(dev, True)63    run_command(['blockdev', '--setrw', dev])64    run_command(['udevadm', 'settle'])65    write(dev)66    run_command(['udevadm', 'settle'])67    run_command(['blockdev', '--setrw', dev])68    run_command(['udevadm', 'settle'])69    write(dev)70    log.info('cannot set device ro when in-use')71    with open(dev, 'r') as f:72        ret, _, _ = run_command(['blockdev', '--setro', dev], False)73        assert ret != 0, 'in-use device was set read-only!'74        run_command(['udevadm', 'settle'])75    run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name])76    run_command(['rbd', 'map', '--read-only', image_name])77    log.info('cannot write to newly mapped ro device')78    write(dev, True)79    log.info('can set ro mapped device rw')80    run_command(['blockdev', '--setrw', dev])81    run_command(['udevadm', 'settle'])82    write(dev)83def main():84    image_name = 'test1'85    # clean up any state from previous test runs86    teardown(image_name, False)87    setup(image_name)88    test_ro(image_name)89    teardown(image_name)90if __name__ == '__main__':...Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!
