How to use run_command method in molecule

Best Python code snippet using molecule_python

test_binaries.py

Source:test_binaries.py Github

copy

Full Screen

...14 This is anything in the client/blackhat/bin folder (not including files in the installable folder)15 """16 def setUp(self) -> None:17 self.computer = init()18 def run_command(self, command, args=None):19 if args is None:20 args = []21 result = self.computer.run_command(command, args, True).data22 # This avoids "None type as no attribute strip"23 if not result:24 result = ""25 return result.strip("\n")26 def test_add_user(self):27 self.run_command("adduser", ["--version"])28 self.run_command("adduser", ["--help"])29 fail_bc_not_root_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)30 expected_fail_bc_not_root_result = Result(success=False, message=ResultMessages.NOT_ALLOWED,31 data="adduser: Only root can add new users!\n")32 self.assertEqual(fail_bc_not_root_result, expected_fail_bc_not_root_result)33 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))34 create_new_user_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)35 expected_create_new_user_result = Result(success=True, message=None, data=None)36 create_duplicate_user_result = self.computer.run_command("adduser", ["testuser", "-p" "password", "-n"], True)37 expected_create_duplicate_user_result = Result(success=False, message=ResultMessages.ALREADY_EXISTS,38 data="adduser: The user 'testuser' already exists.\n")39 self.assertEqual(create_new_user_result, expected_create_new_user_result)40 self.assertEqual(create_duplicate_user_result, expected_create_duplicate_user_result)41 # Now confirm that the user was actually added42 get_user_result = self.computer.get_user(username="testuser")43 expected_get_user_result = Result(success=True, message=None,44 data=User(1001, "testuser", md5("password".encode()).hexdigest()))45 self.assertEqual(get_user_result, expected_get_user_result)46 def test_base32(self):47 self.run_command("base32", ["--version"])48 self.run_command("base32", ["--help"])49 message = "Hello!"50 self.run_command("touch", ["file"])51 # We're writing with a new line to replicate what `echo hello! > file` would do52 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)53 base32_result = self.run_command("base32", ["file"]).split(" ")[1].strip("\n")54 self.assertEqual(b32decode(base32_result).decode().strip("\n"), message)55 def test_base64(self):56 self.run_command("base64", ["--version"])57 self.run_command("base64", ["--help"])58 message = "Hello!"59 self.run_command("touch", ["file"])60 # We're writing with a new line to replicate what `echo hello! > file` would do61 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)62 base64_result = self.run_command("base64", ["file"]).split(" ")[1]63 self.assertEqual(b64decode(base64_result).decode().strip("\n"), message)64 def test_cat(self):65 self.run_command("cat", ["--version"])66 self.run_command("cat", ["--help"])67 message = "Hello World!"68 self.run_command("touch", ["file"])69 self.computer.sys_write("/home/steve/file", message)70 cat_result = self.run_command("cat", ["file"])71 self.assertEqual(message, cat_result)72 # cat on directory should fail73 result = self.computer.run_command("cat", ["/home/steve"], True)74 self.assertIn("Is a directory", result.data)75 # cat respects permission76 result = self.computer.run_command("cat", ["/etc/shadow"], True)77 self.assertIn("Permission denied", result.data)78 def test_cd(self):79 self.run_command("cd", ["--version"])80 self.run_command("cd", ["--help"])81 self.assertEqual(self.computer.sys_getcwd().pwd(), "/home/steve")82 self.run_command("cd", [".."])83 self.assertEqual(self.computer.sys_getcwd().pwd(), "/home")84 self.run_command("cd", ["..."])85 self.assertEqual(self.computer.sys_getcwd().pwd(), "/")86 self.run_command("cd", ["/etc/skel/Desktop"])87 self.assertEqual(self.computer.sys_getcwd().pwd(), "/etc/skel/Desktop")88 self.run_command("cd", ["~"])89 self.assertEqual(self.computer.sys_getcwd().pwd(), "/home/steve")90 def test_chown(self):91 self.run_command("chown", ["--version"])92 self.run_command("chown", ["--help"])93 self.run_command("touch", ["testfile"])94 self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.owner, 1000)95 self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.group_owner, 1000)96 self.run_command("chown", ["root:root", "testfile"])97 self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.owner, 0)98 self.assertEqual(self.computer.fs.find("/home/steve/testfile").data.group_owner, 0)99 def test_clear(self):100 self.run_command("clear", ["--version"])101 self.run_command("clear", ["--help"])102 self.run_command("clear")103 def test_commands(self):104 self.run_command("commands", ["--version"])105 self.run_command("commands", ["--help"])106 commands_result = self.run_command("commands")107 self.assertIn("ls", commands_result)108 # Now lets remove ls from /bin (it shouldn't show up in commands)109 # We need root permissions110 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))111 move_result = self.computer.run_command("mv", ["/bin/ls", "/tmp"], True)112 self.assertTrue(move_result.success)113 commands_result = self.run_command("commands")114 self.assertNotIn("ls", commands_result)115 def test_date(self):116 self.run_command("date", ["--version"])117 self.run_command("date", ["--help"])118 date_result = self.run_command("date")119 local_timezone = datetime.datetime.now(datetime.timezone.utc).astimezone().tzinfo120 time_first = datetime.datetime.now().strftime('%a %b %d %I:%M:%S %p')121 time_second = datetime.datetime.now().strftime('%Y')122 expected_result = f"{time_first} {local_timezone} {time_second}"123 self.assertEqual(date_result, expected_result)124 def test_echo(self):125 self.run_command("echo", ["--version"])126 self.run_command("echo", ["--help"])127 echo_result = self.computer.run_command("echo", ["hello!"], True).data128 self.assertEqual(echo_result, "hello!\n")129 # Test -e flag130 echo_e_flag_result = self.computer.run_command("echo", ["-e", "hello\n\tworld!"], True).data131 self.assertEqual(echo_e_flag_result, "hello\n\tworld!\n")132 # Test -n flag133 echo_n_flag_result = self.computer.run_command("echo", ["-n", "hello world!\n"], True).data134 self.assertEqual(echo_n_flag_result, "hello world!\n")135 def test_env(self):136 self.run_command("env", ["--version"])137 self.run_command("env", ["--help"])138 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("env"))139 def test_export(self):140 self.run_command("export", ["--version"])141 self.run_command("export", ["--help"])142 # Lets get a baseline for the env before-hand143 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))144 self.run_command("export", ["BASH=/bin/bash"])145 env_result = self.run_command("printenv")146 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve\nBASH=/bin/bash", env_result)147 def test_head(self):148 self.run_command("head", ["--version"])149 self.run_command("head", ["--help"])150 message = "one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten\neleven\ntwelve\nthirteen\nfourteen\nfifteen\nsixteen\nseventeen\neighteen\nninteen\ntwenty"151 # Make a file with 20 lines152 self.run_command("touch", ["bigfile"])153 self.computer.sys_write("/home/steve/bigfile", message)154 expected_result = "\n".join(message.split("\n")[:-10])155 actual_result = self.run_command("head", ["bigfile"])156 self.assertEqual(expected_result, actual_result)157 # TODO: Test --wrap flag158 def test_hostname(self):159 self.run_command("hostname", ["--version"])160 self.run_command("hostname", ["--help"])161 # We want to try to set the hostname without root permission (should fail)162 set_hostname_result = self.computer.run_command("hostname", ["localhost"], True)163 expected_set_hostname_result = Result(success=False,164 data="hostname: you must be root to change the host name")165 set_hostname_result.data = set_hostname_result.data.strip("\n")166 self.assertEqual(set_hostname_result, expected_set_hostname_result)167 # Make sure it failed168 get_hostname_result = self.run_command("hostname")169 self.assertNotEqual(get_hostname_result, "localhost")170 # Switch to root then change hostname171 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))172 self.run_command("hostname", ["localhost"])173 get_hostname_result = self.run_command("hostname")174 self.assertEqual(get_hostname_result, "localhost")175 def test_id(self):176 self.run_command("id", ["--version"])177 self.run_command("id", ["--help"])178 self_id_result = self.run_command("id")179 expected_self_id_result = "uid=1000(steve) gid=1000(steve) "180 self.assertEqual(self_id_result, expected_self_id_result)181 root_id_result = self.run_command("id", ["root"])182 root_id_by_uid_result = self.run_command("id", ["0"])183 expected_root_result = "uid=0(root) gid=0(root) "184 self.assertEqual(root_id_result, expected_root_result)185 self.assertEqual(root_id_by_uid_result, expected_root_result)186 # Check the flags187 id_result = self.run_command("id", ["-g"])188 self.assertEqual(id_result, "1000")189 id_result = self.run_command("id", ["-G"])190 self.assertEqual(id_result, "1000")191 id_result = self.run_command("id", ["-n"])192 self.assertEqual(id_result, "steve")193 def test_ls(self):194 import re195 self.run_command("ls", ["--version"])196 self.run_command("ls", ["--help"])197 # In the user's home directory, The files should be: Desktop Documents Downloads Music Pictures Public Templates Videos198 ls_result = self.computer.run_command("ls", ["--no-color"], True)199 expected_ls_result = Result(success=True, message=None,200 data="Desktop Documents Downloads Music Pictures Public Templates Videos ")201 ls_result.data = ls_result.data.strip("\n")202 color_filter = re.compile(r'\x1b[^m]*m')203 stripped_color = color_filter.sub('', ls_result.data)204 self.assertEqual(stripped_color, expected_ls_result.data)205 # Make sure .shellrc isn't there (as -a isn't specified)206 self.assertNotIn(".shellrc", ls_result.data)207 # Test -a flag (show hidden files)208 ls_result = self.run_command("ls", ["--no-color", "-a"])209 self.assertIn(".shellrc", ls_result)210 # Make sure -l flag works211 ls_result = self.run_command("ls", ["--no-color", "-l"])212 self.assertIn("steve", ls_result)213 def test_md5sum(self):214 self.run_command("md5sum", ["--version"])215 self.run_command("md5sum", ["--help"])216 message = "hello!"217 self.run_command("touch", ["file"])218 # We're writing with a new line to replicate what `echo hello! > file` would do219 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)220 self.assertIn(md5((message + "\n").encode()).hexdigest(), self.run_command("md5sum", ["file"]))221 self.assertIn(md5(message.encode()).hexdigest(), self.run_command("md5sum", ["file", "-z"]))222 self.assertEqual(self.run_command("md5sum", ["file", "-z", "--tag"]),223 f"MD5 (file) = {md5(message.encode()).hexdigest()}")224 def test_mv(self):225 self.run_command("mv", ["--version"])226 self.run_command("mv", ["--help"])227 # Make sure /bin/pwd exists228 find_pwd = self.computer.fs.find("/bin/pwd")229 self.assertTrue(find_pwd.success)230 # Make sure permissions work when moving231 self.assertFalse(self.computer.run_command("mv", ["/bin/pwd", "/tmp"], True).success)232 # Make sure /bin/pwd still exists233 find_pwd = self.computer.fs.find("/bin/pwd")234 self.assertTrue(find_pwd.success)235 # Make sure /tmp/pwd DOESN'T exists236 find_pwd = self.computer.fs.find("/tmp/pwd")237 self.assertFalse(find_pwd.success)238 # Root perms!239 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))240 self.run_command("mv", ["/bin/pwd", "/tmp"])241 # Make sure /bin/pwd DOESN'T exists242 find_pwd = self.computer.fs.find("/bin/pwd")243 self.assertFalse(find_pwd.success)244 # Make sure /tmp/pwd exists245 find_pwd = self.computer.fs.find("/tmp/pwd")246 self.assertTrue(find_pwd.success)247 def test_passwd(self):248 self.run_command("passwd", ["--version"])249 self.run_command("passwd", ["--help"])250 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))251 self.computer.run_command("adduser", ["testuser", "-p" "tester", "-n"], True)252 self.computer.sessions.pop()253 # su to sanity check password254 with unittest.mock.patch("getpass.getpass", return_value="tester"):255 with unittest.mock.patch("getpass.fallback_getpass", return_value="tester"):256 self.run_command("su", ["testuser"])257 self.assertEqual("testuser", self.run_command("whoami"))258 self.computer.sessions.pop()259 # Change testuser's password260 print(self.run_command("passwd", ["testuser", "-p", "newpassword"]))261 # Confirm the old password doesn't work262 result = self.run_command("su", ["testuser"])263 self.assertIn("Authentication failure", result)264 # A valid password is required to reset the user's password265 result = self.run_command("passwd", ["testuser", "-p", "newnewpassword"])266 self.assertIn("Authentication token manipulation error", result)267 # Confirm the new password works268 with unittest.mock.patch("getpass.getpass", return_value="newpassword"):269 with unittest.mock.patch("getpass.fallback_getpass", return_value="newpassword"):270 self.run_command("su", ["testuser"])271 self.assertEqual("testuser", self.run_command("whoami"))272 self.computer.sessions.pop()273 def test_printenv(self):274 self.run_command("printenv", ["--version"])275 self.run_command("printenv", ["--help"])276 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))277 def test_pwd(self):278 self.run_command("pwd", ["--version"])279 self.run_command("pwd", ["--help"])280 pwd_result = self.run_command("pwd")281 self.assertEqual("/home/steve", pwd_result)282 def test_rm(self):283 self.run_command("rm", ["--version"])284 self.run_command("rm", ["--help"])285 # Make some test files to remove286 self.run_command("touch", ["file1", "file2", "file3", "/tmp/file4"])287 ls_result = self.run_command("ls")288 # Sanity check289 for file in ["file1", "file2", "file3"]:290 self.assertIn(file, ls_result)291 # rm a single file (relative path)292 self.run_command("rm", ["file1"])293 ls_result = self.run_command("ls")294 self.assertNotIn("file1", ls_result)295 # rm a file (absolute path)296 self.run_command("rm", ["/tmp/file4"])297 ls_result = self.run_command("ls", ["/tmp"])298 self.assertNotIn("file4", ls_result)299 # TODO: Allow multiple files to be deleted and then add test300 # Delete non-existent file301 rm_doesnt_exist = self.computer.run_command("rm", ["/tmp/badfile"], True)302 self.assertFalse(rm_doesnt_exist.success)303 self.assertIn("No such file or directory", rm_doesnt_exist.data)304 # No rm permissions305 rm_no_perms = self.computer.run_command("rm", ["/etc/passwd"], False)306 self.assertFalse(rm_no_perms.success)307 self.assertIn("Permission denied", rm_no_perms.data)308 # Confirm it's still there309 self.assertTrue(self.computer.run_command("stat", ["/etc/passwd"], True).success)310 # Can't remove populated directory311 self.run_command("mkdir", ["files"])312 self.run_command("cd", ["files"])313 self.run_command("touch", ["file1", "file2"])314 self.run_command("cd", [".."])315 rm_full_dir = self.computer.run_command("rm", ["files"], True)316 self.assertFalse(rm_full_dir.success)317 self.assertIn("Is a directory", rm_full_dir.data)318 # Confirm it's still there319 self.assertTrue(self.computer.run_command("stat", ["files"], True).success)320 # Remove directory with -r321 rm_dir_with_flag = self.computer.run_command("rm", ["-r", "files"], True)322 self.assertTrue(rm_dir_with_flag.success)323 # Confirm324 self.assertFalse(self.computer.run_command("stat", ["files"], True).success)325 # rm *326 self.run_command("mkdir", ["files"])327 self.run_command("cd", ["files"])328 self.run_command("touch", ["file1", "file2"])329 rm_star = self.computer.run_command("rm", ["*"], True)330 self.assertTrue(rm_star.success)331 ls_result = self.run_command("ls")332 self.assertNotIn("file1", ls_result)333 self.assertNotIn("file2", ls_result)334 def test_sha1sum(self):335 self.run_command("sha1sum", ["--version"])336 self.run_command("sha1sum", ["--help"])337 message = "hello!"338 self.run_command("touch", ["file"])339 # We're writing with a new line to replicate what `echo hello! > file` would do340 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)341 self.assertIn(sha1((message + "\n").encode()).hexdigest(), self.run_command("sha1sum", ["file"]))342 self.assertIn(sha1(message.encode()).hexdigest(), self.run_command("sha1sum", ["file", "-z"]))343 self.assertEqual(self.run_command("sha1sum", ["file", "-z", "--tag"]),344 f"SHA1 (file) = {sha1(message.encode()).hexdigest()}")345 def test_sha224sum(self):346 self.run_command("sha224sum", ["--version"])347 self.run_command("sha224sum", ["--help"])348 message = "hello!"349 self.run_command("touch", ["file"])350 # We're writing with a new line to replicate what `echo hello! > file` would do351 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)352 self.assertIn(sha224((message + "\n").encode()).hexdigest(),353 self.run_command("sha224sum", ["file"]))354 self.assertIn(sha224(message.encode()).hexdigest(),355 self.run_command("sha224sum", ["file", "-z"]))356 self.assertEqual(self.run_command("sha224sum", ["file", "-z", "--tag"]),357 f"SHA224 (file) = {sha224(message.encode()).hexdigest()}")358 def test_sha256sum(self):359 self.run_command("sha256sum", ["--version"])360 self.run_command("sha256sum", ["--help"])361 message = "hello!"362 self.run_command("touch", ["file"])363 # We're writing with a new line to replicate what `echo hello! > file` would do364 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)365 self.assertIn(sha256((message + "\n").encode()).hexdigest(),366 self.run_command("sha256sum", ["file"]))367 self.assertIn(sha256(message.encode()).hexdigest(),368 self.run_command("sha256sum", ["file", "-z"]))369 self.assertEqual(self.run_command("sha256sum", ["file", "-z", "--tag"]),370 f"SHA256 (file) = {sha256(message.encode()).hexdigest()}")371 def test_sha384sum(self):372 self.run_command("sha384sum", ["--version"])373 self.run_command("sha384sum", ["--help"])374 message = "hello!"375 self.run_command("touch", ["file"])376 # We're writing with a new line to replicate what `echo hello! > file` would do377 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)378 self.assertIn(sha384((message + "\n").encode()).hexdigest(),379 self.run_command("sha384sum", ["file"]))380 self.assertIn(sha384(message.encode()).hexdigest(),381 self.run_command("sha384sum", ["file", "-z"]))382 self.assertEqual(self.run_command("sha384sum", ["file", "-z", "--tag"]),383 f"SHA384 (file) = {sha384(message.encode()).hexdigest()}")384 def test_sha512sum(self):385 self.run_command("sha512sum", ["--version"])386 self.run_command("sha512sum", ["--help"])387 message = "hello!"388 self.run_command("touch", ["file"])389 # We're writing with a new line to replicate what `echo hello! > file` would do390 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)391 self.assertIn(sha512((message + "\n").encode()).hexdigest(),392 self.run_command("sha512sum", ["file"]))393 self.assertIn(sha512(message.encode()).hexdigest(),394 self.run_command("sha512sum", ["file", "-z"]))395 self.assertEqual(self.run_command("sha512sum", ["file", "-z", "--tag"]),396 f"SHA512 (file) = {sha512(message.encode()).hexdigest()}")397 def test_su(self):398 self.run_command("su", ["--version"])399 self.run_command("su", ["--help"])400 self.run_command("su", ["--version"])401 # Sanity check402 self.assertIn("uid=1000(steve) gid=1000(steve)", self.run_command("id"))403 with unittest.mock.patch("getpass.getpass", return_value="password"):404 with unittest.mock.patch("getpass.fallback_getpass", return_value="password"):405 self.run_command("su")406 self.assertIn("uid=0(root) gid=0(root)", self.run_command("id"))407 self.assertTrue(self.computer.run_command("adduser", ["testuser", "-p" "NOTPASSWORD", "-n"], True).success)408 # Now that we're root, we should be able to change user without a password409 self.run_command("su", ["testuser"])410 self.assertIn("uid=1001(testuser) gid=1001(testuser)", self.run_command("id"))411 def test_sudo(self):412 self.run_command("sudo", ["--version"])413 self.run_command("sudo", ["--version"])414 # Sanity check415 sudo_result = self.run_command("id")416 self.assertIn("uid=1000(steve) gid=1000(steve)", sudo_result)417 # Add a new user to test later418 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))419 self.run_command("adduser", ["testuser", "-p" "password", "-n"])420 self.computer.sessions.pop()421 # We do both "getpass" and "fallback_getpass" because some terminals422 # (pycharm's one for example) uses the fallback function423 with unittest.mock.patch("getpass.getpass", return_value="password"):424 with unittest.mock.patch("getpass.fallback_getpass", return_value="password"):425 sudo_root_result = self.run_command("sudo", ["id"])426 self.assertIn("uid=0(root) gid=0(root)", sudo_root_result)427 other_user_result = self.run_command("sudo", ["-u", "testuser", "id"])428 self.assertIn("uid=1001(testuser) gid=1001(testuser)", other_user_result)429 def test_tail(self):430 self.run_command("tail", ["--version"])431 self.run_command("tail", ["--help"])432 message = "one\ntwo\nthree\nfour\nfive\nsix\nseven\neight\nnine\nten\neleven\ntwelve\nthirteen\nfourteen\nfifteen\nsixteen\nseventeen\neighteen\nninteen\ntwenty"433 # Make a file with 20 lines434 self.run_command("touch", ["bigfile"])435 self.computer.sys_write("/home/steve/bigfile", message)436 expected_result = "\n".join(message.split("\n")[10:])437 actual_result = self.run_command("tail", ["bigfile"])438 self.assertEqual(expected_result, actual_result)439 # TODO: Test --wrap flag440 def test_touch(self):441 self.run_command("touch", ["--version"])442 self.run_command("touch", ["--help"])443 touch_result = self.computer.run_command("touch", ["testfile"], True)444 expected_touch_result = Result(success=True)445 # We should double-check that the file exists in our home directory446 self.assertIn("testfile", self.run_command("ls"))447 self.assertEqual(touch_result, expected_touch_result)448 def test_uname(self):449 self.run_command("uname", ["--version"])450 self.run_command("uname", ["--help"])451 # Test some misc uname configurations452 self.assertEqual(self.run_command("uname"), "Linux")453 self.assertEqual(self.run_command("uname", ["-a"]),454 f"Linux {self.computer.hostname} 1.1 v1 x86_64 Blackhat/Linux")455 self.assertEqual(self.run_command("uname", ["-mns"]),456 f"Linux {self.computer.hostname} x86_64 ")457 self.assertEqual(self.run_command("uname", ["-op", "-i"]),458 "unknown unknown Blackhat/Linux ")459 def test_unset(self):460 self.run_command("unset", ["--version"])461 self.run_command("unset", ["--help"])462 # Lets get a baseline for the env before-hand463 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))464 self.run_command("export", ["BASH=/bin/bash"])465 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve\nBASH=/bin/bash",466 self.run_command("printenv"))467 self.run_command("unset", ["BASH"])468 self.assertEqual("PATH=/bin:/usr/bin\nHOME=/home/steve\nUSER=steve", self.run_command("printenv"))469 def test_uptime(self):470 self.run_command("uptime", ["--version"])471 self.run_command("uptime", ["--help"])472 # Make sure it doesn't crash473 uptime_result = self.run_command("uptime")474 # TODO: See if there's a better way to fix this without wasting 5 seconds475 self.assertEqual(uptime_result, "uptime: 0:00:00")476 sleep(5)477 uptime_result = self.run_command("uptime")478 self.assertEqual(uptime_result, "uptime: 0:00:05")479 uptime_result = self.run_command("uptime", ["-p"])480 self.assertEqual(uptime_result, "up 0 minutes")481 now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")482 uptime_result = self.run_command("uptime", ["-s"])483 # I'm going to strip out the seconds because different speed test runners can take different amount of times484 # to run485 now = ":".join(now.split(":")[:-1])486 uptime_result = ":".join(uptime_result.split(":")[:-1])487 # We have to check +1 or -1 minute because there is a possibility that it can be one minute behind488 minus_one_minute = uptime_result.split(":")489 minus_one_minute[1] = "{:02d}".format(int(minus_one_minute[1]) - 1)490 minus_one_minute = ":".join(minus_one_minute)491 plus_one_minute = uptime_result.split(":")492 plus_one_minute[1] = "{:02d}".format(int(plus_one_minute[1]) + 1)493 plus_one_minute = ":".join(plus_one_minute)494 self.assertIn(now, [minus_one_minute, uptime_result, plus_one_minute])495 def test_users(self):496 self.run_command("users", ["--version"])497 self.run_command("users", ["--help"])498 # One session = only one user499 users_result = self.run_command("users")500 self.assertEqual(users_result, "steve ")501 # Add new sessions then test502 self.computer.sessions.append(Session(0, self.computer.fs.files, self.computer.sessions[-1].id + 1))503 users_result = self.run_command("users")504 self.assertEqual(users_result, "steve root ")505 self.computer.sessions.append(Session(1000, self.computer.fs.files, self.computer.sessions[-1].id + 1))506 users_result = self.run_command("users")507 self.assertEqual(users_result, "steve root steve ")508 def test_wc(self):509 self.run_command("wc", ["--version"])510 self.run_command("wc", ["--help"])511 message = "hello!"512 message_len = len(message + "\n")513 self.run_command("touch", ["file"])514 # We're writing with a new line to replicate what `echo hello! > file` would do515 self.computer.fs.find("/home/steve/file").data.write(message + "\n", self.computer)516 wc_result = self.run_command("wc", ["file"])517 wc_split_results = wc_result.split(" ")518 while "" in wc_split_results:519 wc_split_results.remove("")520 total_new_line_count, total_word_count, total_byte_count = wc_split_results[0], wc_split_results[1], \521 wc_split_results[2]522 self.assertEqual(int(total_byte_count), message_len)523 self.assertEqual(int(total_new_line_count), 1)524 self.assertEqual(int(total_word_count), 1)525 wc_result = self.run_command("wc", ["file", "-c"])526 wc_split_results = wc_result.split(" ")527 while "" in wc_split_results:528 wc_split_results.remove("")529 total_byte_count = wc_split_results[0]530 self.assertEqual(int(total_byte_count), message_len)531 wc_result = self.run_command("wc", ["file", "-m"])532 wc_split_results = wc_result.split(" ")533 while "" in wc_split_results:534 wc_split_results.remove("")535 total_byte_count = wc_split_results[0]536 self.assertEqual(int(total_byte_count), message_len)537 def test_who(self):538 self.run_command("who", ["--version"])539 self.run_command("who", ["--help"])540 self.assertEqual(self.run_command("who"), "steve\tpts/0")541 def test_whoami(self):542 self.run_command("whoami", ["--version"])543 self.run_command("whoami", ["--help"])544 self.assertEqual(self.run_command("whoami"), "steve")545class TestInstallableBinaries(unittest.TestCase):546 """547 These test cases test the functionality of the binaries that are NOT included with the system548 (anything installable through apt)549 This is anything in the client/blackhat/bin/installable folder550 """551 def setUp(self) -> None:552 self.computer = init()553 def run_command(self, command, args=None):554 if args is None:555 args = []556 result = self.computer.run_command(command, args, True).data557 # This avoids "None type as no attribute strip"558 if not result:559 result = ""...

Full Screen

Full Screen

setup.py

Source:setup.py Github

copy

Full Screen

...6 # log_msg(f"###### Setup iSCSI target {id}")7 # log_msg(ISCSI_TARGET_IMGS)8 TGT_BS=ISCSI_TARGET_IMGS[id]9 ISCSI_TARGET_NAME=ISCSI_TARGET_NAMES[id]10 run_command(f"mkdir -p {IMG_DIR}")11 run_command(f"fallocate -l {DUMMY_DISK_SIZE} {TGT_BS}")12 run_command(f"sudo {PFE_USR_DIR}/tgtadm --lld iscsi --op new --mode target --tid {id} -T {ISCSI_TARGET_NAME}")13 run_command(f"sudo {PFE_USR_DIR}/tgtadm --lld iscsi --op new --mode logicalunit --tid {id} --lun 1 -b {TGT_BS}")14 run_command(f"sudo {PFE_USR_DIR}/tgtadm --lld iscsi --op bind --mode target --tid {id} -I ALL")15 # run_command(f"sudo {PFE_USR_DIR}/tgtadm --lld iscsi --op show --mode target")16def set_initiator(ids):17 log_msg(f" ==== Starting iSCSI initiator ====", emphasize=True)18 run_command(f"sudo service open-iscsi stop && sudo service open-iscsi start", suppress_output=True)19 run_command(f"sudo iscsiadm --mode discovery --type sendtargets --portal {ISCSI_TARGET_IP}")20 for id in ids:21 ISCSI_TARGET_NAME=ISCSI_TARGET_NAMES[id]22 run_command(f"sudo iscsiadm --mode node --targetname {ISCSI_TARGET_NAME} --portal {ISCSI_TARGET_IP}:3260 --login")23def run_tgtd():24 log_msg(f" ==== Starting iSCSI daemon ====", emphasize=True)25 run_command(f"mkdir -p {PFE_LOG_DIR}")26 run_command(f"sudo rm {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")27 run_command(f"touch {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")28 run_command(f"sudo chmod 666 {IO_HEAD_LOG} {IO_DATA_LOG} {IO_HEAD_SPLIT_LOG} {ERR_BLOCK_LOG} {TGTD_LOG}")29 # Run tgtd30 run_command(f"sudo {PFE_USR_DIR}/tgtd -f --pfe-io-header-log {IO_HEAD_LOG} --pfe-fail-type-tgtd 0 --pfe-err-blk {ERR_BLOCK_LOG} --pfe-io-data-log {IO_DATA_LOG} --pfe-enable-record 0 > {TGTD_LOG} 2>&1 &")31 # run_command(f"sudo {PFE_USR_DIR}/tgtd --pfe-io-header-log {IO_HEAD_LOG} --pfe-fail-type-tgtd 0 --pfe-err-blk {ERR_BLOCK_LOG} --pfe-io-data-log {IO_DATA_LOG} --pfe-enable-record 0 > {TGTD_LOG} 2>&1 ")32def kill_tgtd():33 log_msg(f" ==== Killing iSCSI daemon ====", warning=True)34 run_command("sudo killall -9 tgtd")35def clean_logs():36 log_msg(f" ==== Cleaning log files ====", warning=True)37 #LOGS = [IO_HEAD_LOG, IO_DATA_LOG, IO_HEAD_SPLIT_LOG, ERR_BLOCK_LOG, TGTD_LOG]38 LOGS = [IO_HEAD_LOG, IO_DATA_LOG, IO_HEAD_SPLIT_LOG, ERR_BLOCK_LOG]39 for LOG in LOGS:40 if os.path.exists(LOG):41 run_command(f"cat /dev/null > {LOG}")42def start_log():43 run_command(f"touch {PFE_USR_DIR}/start_log")44def stop_log():45 run_command(f"rm -f {PFE_USR_DIR}/start_log")46def start_replay():47 run_command(f"touch {PFE_USR_DIR}/start_replay")48def stop_replay():49 run_command(f"rm -f {PFE_USR_DIR}/start_replay")50def reset_queues():51 log_msg(f" ==== Resetting message queues ====", warning=True)52 for queue in QUEUES:53 assert(queue != "" and queue != "/")54 run_command(f"rm -f {queue}")55 run_command(f"touch {queue}")56def reset_flags():57 log_msg(f" ==== Resetting flags ====", warning=True)58 run_command(f"rm -f {HARVEST_FLAG}")59 run_command(f"rm -f {END_FLAG}")60def clear_queues():61 log_msg(f" ==== Clearing message queues ====", warning=True)62 run_command(f"sudo ./mqueue_cleaner")63 run_command(f"sudo ./mqueue_cleaner")64def obtain_disk_id(ids):65 disk_ids = []66 raw_output = run_command("sudo lsblk -d", output=True)67 raw_output = raw_output['stdout']68 for line in raw_output.strip().split("\n"):69 line = list(filter(lambda x: len(x) > 0, line.strip().split(" ")))70 if line[3] == DUMMY_DISK_SIZE_STRING:71 disk_ids.append(line[0])72 # FIXME: find a way to restart the script instead of exit73 assert(len(disk_ids) == len(ids))74 75 log_msg("Virtual disk ids", disk_ids)76 return disk_ids77def kill_vms(vms):78 log_msg(f" ==== Killing VMs ====", warning=True)79 for vm in vms:80 run_command(f"sudo virsh shutdown {vm}")81 # check if all vms are shutdown 82 while True:83 raw_output = run_command("sudo virsh list --all --state-shutoff", output=True)84 raw_output = raw_output['stdout']85 all_shutdown = True86 for vm in vms:87 if vm not in raw_output:88 all_shutdown = False89 break90 if not all_shutdown:91 time.sleep(3)92 else:93 break94def start_vms(vms):95 log_msg(f" ==== Starting VMs ====", emphasize=True)96 for vm in vms:97 run_command(f"sudo virsh start {vm}")98 # check if all vms are running99 while True:100 raw_output = run_command("sudo virsh list --all --state-running", output=True)101 raw_output = raw_output['stdout']102 all_running = True103 for vm in vms:104 if vm not in raw_output:105 all_running = False106 break107 if not all_running:108 time.sleep(3)109 else:110 break111 112 # test ssh connection113 log_msg("Waiting for ssh connections (may take a while)")114 for vm in vms:115 while True:116 output = run_command(f"ssh {vm} 'date'", error=True, verbose=False)117 output = output['stderr']118 if "No route to host" in output:119 time.sleep(3)120 else:121 break122def attach_device(disk_ids, vms):123 log_msg(f" ==== Attaching VM devices ====", emphasize=True)124 for i, vm in enumerate(vms):125 run_command(f"sudo virsh attach-disk {vm} /dev/{disk_ids[i]} --target vdb")126def detach_device(vms):127 log_msg(f" ==== Detaching VM disks ====", warning=True)128 raw_output = run_command("sudo virsh list --all --state-running", output=True)129 raw_output = raw_output['stdout']130 for vm in vms:131 if vm not in raw_output:132 continue133 raw_output = run_command(f"ssh {vm} 'ls /dev/vd*'", output=True)134 raw_output = raw_output['stdout']135 ## FIXME: by default we assume the last disk in VM is the virtual disk 136 disk_id = raw_output.strip().split("\n")[-1].split("/")[-1]137 ## we only find the default disk -- no additional virtual disks are attached yet138 if "vda" in disk_id:139 continue140 run_command(f"sudo virsh detach-disk {vm} --target {disk_id}")141def run_workloads(with_vm, vms, workload="terasort", mode="harvest"):142 log_msg(f" ==== Running workload {workload} with {mode} mode ====", emphasize=True)143 if with_vm:144 run_command(f"ssh {vms[0]} 'sudo python3 workload_parser.py {workload} {workload}.out' &")145 if mode == "harvest":146 while not os.path.exists(HARVEST_FLAG):147 time.sleep(5)148 run_command(f"ssh {vms[1]} 'sudo python3 workload_parser.py ycsb ycsb.out ' &")149 else:150 run_command(f"sudo python3 workload.py {workload} {USR_DIR}/logs/{workload}.out &")151 if mode == "harvest":152 while not os.path.exists(HARVEST_FLAG):153 time.sleep(5)154 log_msg(f" ==== Running regular workload ycsb ====", emphasize=True)155 run_command(f"sudo python3 workload.py ycsb {USR_DIR}/logs/ycsb.out &")156def stop_workloads(with_vm, vms):157 log_msg(f" ==== Stopping workloads ====", warning=True)158 if with_vm:159 raw_output = run_command("sudo virsh list --all --state-running", output=True)160 raw_output = raw_output['stdout']161 for vm in vms:162 if vm not in raw_output:163 continue164 run_command("ssh "+vm+" \"ps ax | grep 'python3 workload_parser.py' | awk '{ print \$1 }' | xargs sudo kill -9\" ")165 run_command("ssh "+vm+" \"ps ax | grep 'python3 joblib' | awk '{ print \$1 }' | xargs sudo kill -9\" ")166 else:167 run_command("ps ax | grep 'python3 workload.py' | awk '{ print $1 }' | xargs sudo kill -9")168 run_command("ps ax | grep 'python3 joblib' | awk '{ print $1 }' | xargs sudo kill -9")169def stop_ocssd():170 log_msg(f" ==== Stopping blockflex ====", warning=True)171 run_command("sudo killall harvest")172def run_ocssd(workload="terasort", mode="harvest"):173 log_msg(f" ==== Running blockflex ====", emphasize=True)174 OUT_DIR = f"{USR_DIR}/results/{workload}"175 if not os.path.exists(OUT_DIR):176 os.mkdir(OUT_DIR)177 run_command(f"sudo chown $USER {OUT_DIR}")178 if mode == "static":179 run_command(f"sudo ../blockflex/harvest 0 0 > {OUT_DIR}/no_harvest_bw.out &")180 elif mode == "harvest":181 run_command(f"sudo ../blockflex/harvest 4 4 > {OUT_DIR}/harvest_bw.out &")182 elif mode == "unsold":183 run_command(f"sudo ../blockflex/harvest 0 4 > {OUT_DIR}/unsold_harvest_bw.out &")184def run_predictor():185 log_msg(f" ==== Running predictor ====", emphasize=True)186 run_command(f"echo '0' > {BLOCKFLEX_DIR}/bw_inputs.txt")187 run_command(f"echo '0' > {BLOCKFLEX_DIR}/sz_inputs.txt")188 run_command(f"sudo python3 ../blockflex/pred.py {USR_DIR}/logs/predictor.out &")189def stop_predictor():190 log_msg(f" ==== Stopping predictor ====", warning=True)191 run_command("ps ax | grep 'pred.py' | awk '{ print $1 }' | xargs sudo kill -9")192if __name__ == "__main__":193 ids = [1,2]194 vms = ["vm0", "vm1"]195 VERBOSE = True196 WITH_VM = False197 running_mode = 'regular'198 if len(sys.argv) >= 2:199 running_mode = sys.argv[1]200 stop_predictor()201 stop_workloads(WITH_VM, vms)202 stop_ocssd()203 clear_queues()204 if WITH_VM:205 detach_device(vms)...

Full Screen

Full Screen

clear_cursors_carets_last_selection_unit_tests.py

Source:clear_cursors_carets_last_selection_unit_tests.py Github

copy

Full Screen

...16class ClearCursorsCaretsLastSelectionUnitTests(utilities.BasicSublimeTextViewTestCase):1718 def test_1_selections_at_first_word(self):19 self.create_test_text(0)20 self.view.window().run_command( "find_under_expand" )21 self.view.window().run_command( "single_selection_last" )2223 region = self.view.sel()[0]24 self.assertEqual( sublime.Region(0, 4), region )2526 def test_2_selections_at_first_word(self):27 self.create_test_text(0)28 self.view.window().run_command( "find_under_expand" )29 self.view.window().run_command( "find_under_expand" )30 self.view.window().run_command( "single_selection_last" )3132 region = self.view.sel()[0]33 self.assertEqual( sublime.Region(5, 9), region )3435 def test_6_selections_at_first_word(self):36 self.create_test_text(0)37 self.view.window().run_command( "find_under_expand" )38 self.view.window().run_command( "find_under_expand" )39 self.view.window().run_command( "find_under_expand" )40 self.view.window().run_command( "find_under_expand" )41 self.view.window().run_command( "find_under_expand" )42 self.view.window().run_command( "find_under_expand" )43 self.view.window().run_command( "single_selection_last" )4445 region = self.view.sel()[0]46 self.assertEqual( sublime.Region(26, 30), region )4748 def test_6_selections_plus_redundant_expand_at_first_word(self):49 self.create_test_text(0)50 self.view.window().run_command( "find_under_expand" )51 self.view.window().run_command( "find_under_expand" )52 self.view.window().run_command( "find_under_expand" )53 self.view.window().run_command( "find_under_expand" )54 self.view.window().run_command( "find_under_expand" )55 self.view.window().run_command( "find_under_expand" )56 self.view.window().run_command( "single_selection_last" )5758 region = self.view.sel()[0]59 self.assertEqual( sublime.Region(26, 30), region )6061 def test_2_selections_at_last_word(self):62 self.create_test_text(26)63 self.view.window().run_command( "find_under_expand" )64 self.view.window().run_command( "find_under_expand" )65 self.view.window().run_command( "single_selection_last" )6667 region = self.view.sel()[0]68 self.assertEqual( sublime.Region(0, 4), region )6970 def test_3_selections_at_last_word(self):71 self.create_test_text(26)72 self.view.window().run_command( "find_under_expand" )73 self.view.window().run_command( "find_under_expand" )74 self.view.window().run_command( "find_under_expand" )75 self.view.window().run_command( "single_selection_last" )7677 region = self.view.sel()[0]78 self.assertEqual( sublime.Region(5, 9), region )7980 def test_6_selections_at_last_word(self):81 self.create_test_text(26)82 self.view.window().run_command( "find_under_expand" )83 self.view.window().run_command( "find_under_expand" )84 self.view.window().run_command( "find_under_expand" )85 self.view.window().run_command( "find_under_expand" )86 self.view.window().run_command( "find_under_expand" )87 self.view.window().run_command( "find_under_expand" )88 self.view.window().run_command( "single_selection_last" )8990 region = self.view.sel()[0]91 self.assertEqual( sublime.Region(21, 25), region )9293 def test_6_selections_at_last_word_plus_redundant_expansion_at_last_word(self):94 self.create_test_text(26)95 self.view.window().run_command( "find_under_expand" )96 self.view.window().run_command( "find_under_expand" )97 self.view.window().run_command( "find_under_expand" )98 self.view.window().run_command( "find_under_expand" )99 self.view.window().run_command( "find_under_expand" )100 self.view.window().run_command( "find_under_expand" )101 self.view.window().run_command( "find_under_expand" )102 self.view.window().run_command( "single_selection_last" )103104 region = self.view.sel()[0]105 self.assertEqual( sublime.Region(21, 25), region ) ...

Full Screen

Full Screen

set_ro.py

Source:set_ro.py Github

copy

Full Screen

...3import subprocess4import sys5logging.basicConfig(level=logging.DEBUG)6log = logging.getLogger()7def run_command(args, except_on_error=True):8 log.debug('running command "%s"', ' '.join(args))9 proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)10 out, err = proc.communicate()11 if out:12 log.debug('stdout: %s', out)13 if err:14 log.debug('stderr: %s', err)15 if proc.returncode:16 log.debug('ret: %d', proc.returncode)17 if except_on_error:18 raise subprocess.CalledProcessError(proc.returncode, ' '.join(args))19 return (proc.returncode, out, err)20def setup(image_name):21 run_command(['rbd', 'create', '-s', '100', image_name])22 run_command(['rbd', 'snap', 'create', image_name + '@snap'])23 run_command(['rbd', 'map', image_name])24 run_command(['rbd', 'map', image_name + '@snap'])25def teardown(image_name, fail_on_error=True):26 run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name + '@snap'], fail_on_error)27 run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name], fail_on_error)28 run_command(['rbd', 'snap', 'rm', image_name + '@snap'], fail_on_error)29 run_command(['rbd', 'rm', image_name], fail_on_error)30def write(target, expect_fail=False):31 try:32 with open(target, 'w', 0) as f:33 f.write('test')34 f.flush()35 assert not expect_fail, 'writing should have failed'36 except IOError:37 assert expect_fail, 'writing should not have failed'38def test_ro(image_name):39 dev = '/dev/rbd/rbd/' + image_name40 snap_dev = dev + '@snap'41 log.info('basic device is readable')42 write(dev)43 log.info('basic snapshot is read-only')44 write(snap_dev, True)45 log.info('cannot set snapshot rw')46 ret, _, _ = run_command(['blockdev', '--setrw', snap_dev], False)47 assert ret != 0, 'snapshot was set read-write!'48 run_command(['udevadm', 'settle'])49 write(snap_dev, True)50 log.info('set device ro')51 run_command(['blockdev', '--setro', dev])52 run_command(['udevadm', 'settle'])53 write(dev, True)54 log.info('cannot set device rw when in-use')55 with open(dev, 'r') as f:56 ret, _, _ = run_command(['blockdev', '--setro', dev], False)57 assert ret != 0, 'in-use device was set read-only!'58 run_command(['udevadm', 'settle'])59 write(dev, True)60 run_command(['blockdev', '--setro', dev])61 run_command(['udevadm', 'settle'])62 write(dev, True)63 run_command(['blockdev', '--setrw', dev])64 run_command(['udevadm', 'settle'])65 write(dev)66 run_command(['udevadm', 'settle'])67 run_command(['blockdev', '--setrw', dev])68 run_command(['udevadm', 'settle'])69 write(dev)70 log.info('cannot set device ro when in-use')71 with open(dev, 'r') as f:72 ret, _, _ = run_command(['blockdev', '--setro', dev], False)73 assert ret != 0, 'in-use device was set read-only!'74 run_command(['udevadm', 'settle'])75 run_command(['rbd', 'unmap', '/dev/rbd/rbd/' + image_name])76 run_command(['rbd', 'map', '--read-only', image_name])77 log.info('cannot write to newly mapped ro device')78 write(dev, True)79 log.info('can set ro mapped device rw')80 run_command(['blockdev', '--setrw', dev])81 run_command(['udevadm', 'settle'])82 write(dev)83def main():84 image_name = 'test1'85 # clean up any state from previous test runs86 teardown(image_name, False)87 setup(image_name)88 test_ro(image_name)89 teardown(image_name)90if __name__ == '__main__':...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run molecule automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful