How to use skipUnless method in avocado

Best Python code snippet using avocado_python

test_posix.py

Source:test_posix.py Github

copy

Full Screen

...41 posix_func = getattr(posix, name, None)42 if posix_func is not None:43 posix_func()44 self.assertRaises(TypeError, posix_func, 1)45 @unittest.skipUnless(hasattr(posix, 'getresuid'),46 'test needs posix.getresuid()')47 def test_getresuid(self):48 user_ids = posix.getresuid()49 self.assertEqual(len(user_ids), 3)50 for val in user_ids:51 self.assertGreaterEqual(val, 0)52 @unittest.skipUnless(hasattr(posix, 'getresgid'),53 'test needs posix.getresgid()')54 def test_getresgid(self):55 group_ids = posix.getresgid()56 self.assertEqual(len(group_ids), 3)57 for val in group_ids:58 self.assertGreaterEqual(val, 0)59 @unittest.skipUnless(hasattr(posix, 'setresuid'),60 'test needs posix.setresuid()')61 def test_setresuid(self):62 current_user_ids = posix.getresuid()63 self.assertIsNone(posix.setresuid(*current_user_ids))64 # -1 means don't change that value.65 self.assertIsNone(posix.setresuid(-1, -1, -1))66 @unittest.skipUnless(hasattr(posix, 'setresuid'),67 'test needs posix.setresuid()')68 def test_setresuid_exception(self):69 # Don't do this test if someone is silly enough to run us as root.70 current_user_ids = posix.getresuid()71 if 0 not in current_user_ids:72 new_user_ids = (current_user_ids[0]+1, -1, -1)73 self.assertRaises(OSError, posix.setresuid, *new_user_ids)74 @unittest.skipUnless(hasattr(posix, 'setresgid'),75 'test needs posix.setresgid()')76 def test_setresgid(self):77 current_group_ids = posix.getresgid()78 self.assertIsNone(posix.setresgid(*current_group_ids))79 # -1 means don't change that value.80 self.assertIsNone(posix.setresgid(-1, -1, -1))81 @unittest.skipUnless(hasattr(posix, 'setresgid'),82 'test needs posix.setresgid()')83 def test_setresgid_exception(self):84 # Don't do this test if someone is silly enough to run us as root.85 current_group_ids = posix.getresgid()86 if 0 not in current_group_ids:87 new_group_ids = (current_group_ids[0]+1, -1, -1)88 self.assertRaises(OSError, posix.setresgid, *new_group_ids)89 @unittest.skipUnless(hasattr(posix, 'initgroups'),90 "test needs os.initgroups()")91 def test_initgroups(self):92 # It takes a string and an integer; check that it raises a TypeError93 # for other argument lists.94 self.assertRaises(TypeError, posix.initgroups)95 self.assertRaises(TypeError, posix.initgroups, None)96 self.assertRaises(TypeError, posix.initgroups, 3, "foo")97 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())98 # If a non-privileged user invokes it, it should fail with OSError99 # EPERM.100 if os.getuid() != 0:101 try:102 name = pwd.getpwuid(posix.getuid()).pw_name103 except KeyError:104 # the current UID may not have a pwd entry105 raise unittest.SkipTest("need a pwd entry")106 try:107 posix.initgroups(name, 13)108 except OSError as e:109 self.assertEqual(e.errno, errno.EPERM)110 else:111 self.fail("Expected OSError to be raised by initgroups")112 @unittest.skipUnless(hasattr(posix, 'statvfs'),113 'test needs posix.statvfs()')114 def test_statvfs(self):115 self.assertTrue(posix.statvfs(os.curdir))116 @unittest.skipUnless(hasattr(posix, 'fstatvfs'),117 'test needs posix.fstatvfs()')118 def test_fstatvfs(self):119 fp = open(support.TESTFN)120 try:121 self.assertTrue(posix.fstatvfs(fp.fileno()))122 self.assertTrue(posix.statvfs(fp.fileno()))123 finally:124 fp.close()125 @unittest.skipUnless(hasattr(posix, 'ftruncate'),126 'test needs posix.ftruncate()')127 def test_ftruncate(self):128 fp = open(support.TESTFN, 'w+')129 try:130 # we need to have some data to truncate131 fp.write('test')132 fp.flush()133 posix.ftruncate(fp.fileno(), 0)134 finally:135 fp.close()136 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")137 def test_truncate(self):138 with open(support.TESTFN, 'w') as fp:139 fp.write('test')140 fp.flush()141 posix.truncate(support.TESTFN, 0)142 @unittest.skipUnless(getattr(os, 'execve', None) in os.supports_fd, "test needs execve() to support the fd parameter")143 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")144 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")145 def test_fexecve(self):146 fp = os.open(sys.executable, os.O_RDONLY)147 try:148 pid = os.fork()149 if pid == 0:150 os.chdir(os.path.split(sys.executable)[0])151 posix.execve(fp, [sys.executable, '-c', 'pass'], os.environ)152 else:153 self.assertEqual(os.waitpid(pid, 0), (pid, 0))154 finally:155 os.close(fp)156 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")157 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")158 def test_waitid(self):159 pid = os.fork()160 if pid == 0:161 os.chdir(os.path.split(sys.executable)[0])162 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)163 else:164 res = posix.waitid(posix.P_PID, pid, posix.WEXITED)165 self.assertEqual(pid, res.si_pid)166 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")167 def test_lockf(self):168 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)169 try:170 os.write(fd, b'test')171 os.lseek(fd, 0, os.SEEK_SET)172 posix.lockf(fd, posix.F_LOCK, 4)173 # section is locked174 posix.lockf(fd, posix.F_ULOCK, 4)175 finally:176 os.close(fd)177 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")178 def test_pread(self):179 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)180 try:181 os.write(fd, b'test')182 os.lseek(fd, 0, os.SEEK_SET)183 self.assertEqual(b'es', posix.pread(fd, 2, 1))184 # the first pread() shouldn't disturb the file offset185 self.assertEqual(b'te', posix.read(fd, 2))186 finally:187 os.close(fd)188 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")189 def test_pwrite(self):190 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)191 try:192 os.write(fd, b'test')193 os.lseek(fd, 0, os.SEEK_SET)194 posix.pwrite(fd, b'xx', 1)195 self.assertEqual(b'txxt', posix.read(fd, 4))196 finally:197 os.close(fd)198 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),199 "test needs posix.posix_fallocate()")200 def test_posix_fallocate(self):201 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)202 try:203 posix.posix_fallocate(fd, 0, 10)204 except OSError as inst:205 # issue10812, ZFS doesn't appear to support posix_fallocate,206 # so skip Solaris-based since they are likely to have ZFS.207 if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):208 raise209 finally:210 os.close(fd)211 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),212 "test needs posix.posix_fadvise()")213 def test_posix_fadvise(self):214 fd = os.open(support.TESTFN, os.O_RDONLY)215 try:216 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)217 finally:218 os.close(fd)219 @unittest.skipUnless(os.utime in os.supports_fd, "test needs fd support in os.utime")220 def test_utime_with_fd(self):221 now = time.time()222 fd = os.open(support.TESTFN, os.O_RDONLY)223 try:224 posix.utime(fd)225 posix.utime(fd, None)226 self.assertRaises(TypeError, posix.utime, fd, (None, None))227 self.assertRaises(TypeError, posix.utime, fd, (now, None))228 self.assertRaises(TypeError, posix.utime, fd, (None, now))229 posix.utime(fd, (int(now), int(now)))230 posix.utime(fd, (now, now))231 self.assertRaises(ValueError, posix.utime, fd, (now, now), ns=(now, now))232 self.assertRaises(ValueError, posix.utime, fd, (now, 0), ns=(None, None))233 self.assertRaises(ValueError, posix.utime, fd, (None, None), ns=(now, 0))234 posix.utime(fd, (int(now), int((now - int(now)) * 1e9)))235 posix.utime(fd, ns=(int(now), int((now - int(now)) * 1e9)))236 finally:237 os.close(fd)238 @unittest.skipUnless(os.utime in os.supports_follow_symlinks, "test needs follow_symlinks support in os.utime")239 def test_utime_nofollow_symlinks(self):240 now = time.time()241 posix.utime(support.TESTFN, None, follow_symlinks=False)242 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), follow_symlinks=False)243 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), follow_symlinks=False)244 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), follow_symlinks=False)245 posix.utime(support.TESTFN, (int(now), int(now)), follow_symlinks=False)246 posix.utime(support.TESTFN, (now, now), follow_symlinks=False)247 posix.utime(support.TESTFN, follow_symlinks=False)248 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")249 def test_writev(self):250 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)251 try:252 n = os.writev(fd, (b'test1', b'tt2', b't3'))253 self.assertEqual(n, 10)254 os.lseek(fd, 0, os.SEEK_SET)255 self.assertEqual(b'test1tt2t3', posix.read(fd, 10))256 # Issue #20113: empty list of buffers should not crash257 try:258 size = posix.writev(fd, [])259 except OSError:260 # writev(fd, []) raises OSError(22, "Invalid argument")261 # on OpenIndiana262 pass263 else:264 self.assertEqual(size, 0)265 finally:266 os.close(fd)267 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")268 def test_readv(self):269 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)270 try:271 os.write(fd, b'test1tt2t3')272 os.lseek(fd, 0, os.SEEK_SET)273 buf = [bytearray(i) for i in [5, 3, 2]]274 self.assertEqual(posix.readv(fd, buf), 10)275 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])276 # Issue #20113: empty list of buffers should not crash277 try:278 size = posix.readv(fd, [])279 except OSError:280 # readv(fd, []) raises OSError(22, "Invalid argument")281 # on OpenIndiana282 pass283 else:284 self.assertEqual(size, 0)285 finally:286 os.close(fd)287 @unittest.skipUnless(hasattr(posix, 'dup'),288 'test needs posix.dup()')289 def test_dup(self):290 fp = open(support.TESTFN)291 try:292 fd = posix.dup(fp.fileno())293 self.assertIsInstance(fd, int)294 os.close(fd)295 finally:296 fp.close()297 @unittest.skipUnless(hasattr(posix, 'confstr'),298 'test needs posix.confstr()')299 def test_confstr(self):300 self.assertRaises(ValueError, posix.confstr, "CS_garbage")301 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)302 @unittest.skipUnless(hasattr(posix, 'dup2'),303 'test needs posix.dup2()')304 def test_dup2(self):305 fp1 = open(support.TESTFN)306 fp2 = open(support.TESTFN)307 try:308 posix.dup2(fp1.fileno(), fp2.fileno())309 finally:310 fp1.close()311 fp2.close()312 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")313 @support.requires_linux_version(2, 6, 23)314 def test_oscloexec(self):315 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)316 self.addCleanup(os.close, fd)317 self.assertFalse(os.get_inheritable(fd))318 @unittest.skipUnless(hasattr(posix, 'O_EXLOCK'),319 'test needs posix.O_EXLOCK')320 def test_osexlock(self):321 fd = os.open(support.TESTFN,322 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)323 self.assertRaises(OSError, os.open, support.TESTFN,324 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)325 os.close(fd)326 if hasattr(posix, "O_SHLOCK"):327 fd = os.open(support.TESTFN,328 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)329 self.assertRaises(OSError, os.open, support.TESTFN,330 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)331 os.close(fd)332 @unittest.skipUnless(hasattr(posix, 'O_SHLOCK'),333 'test needs posix.O_SHLOCK')334 def test_osshlock(self):335 fd1 = os.open(support.TESTFN,336 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)337 fd2 = os.open(support.TESTFN,338 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)339 os.close(fd2)340 os.close(fd1)341 if hasattr(posix, "O_EXLOCK"):342 fd = os.open(support.TESTFN,343 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)344 self.assertRaises(OSError, os.open, support.TESTFN,345 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)346 os.close(fd)347 @unittest.skipUnless(hasattr(posix, 'fstat'),348 'test needs posix.fstat()')349 def test_fstat(self):350 fp = open(support.TESTFN)351 try:352 self.assertTrue(posix.fstat(fp.fileno()))353 self.assertTrue(posix.stat(fp.fileno()))354 self.assertRaisesRegex(TypeError,355 'should be string, bytes, os.PathLike or integer, not',356 posix.stat, float(fp.fileno()))357 finally:358 fp.close()359 @unittest.skipUnless(hasattr(posix, 'stat'),360 'test needs posix.stat()')361 def test_stat(self):362 self.assertTrue(posix.stat(support.TESTFN))363 self.assertTrue(posix.stat(os.fsencode(support.TESTFN)))364 self.assertWarnsRegex(DeprecationWarning,365 'should be string, bytes, os.PathLike or integer, not',366 posix.stat, bytearray(os.fsencode(support.TESTFN)))367 self.assertRaisesRegex(TypeError,368 'should be string, bytes, os.PathLike or integer, not',369 posix.stat, None)370 self.assertRaisesRegex(TypeError,371 'should be string, bytes, os.PathLike or integer, not',372 posix.stat, list(support.TESTFN))373 self.assertRaisesRegex(TypeError,374 'should be string, bytes, os.PathLike or integer, not',375 posix.stat, list(os.fsencode(support.TESTFN)))376 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")377 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")378 def test_mkfifo(self):379 support.unlink(support.TESTFN)380 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)381 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))382 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),383 "don't have mknod()/S_IFIFO")384 @unittest.skipIf(android_not_root, "mknod not allowed, non root user")385 def test_mknod(self):386 # Test using mknod() to create a FIFO (the only use specified387 # by POSIX).388 support.unlink(support.TESTFN)389 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR390 try:391 posix.mknod(support.TESTFN, mode, 0)392 except OSError as e:393 # Some old systems don't allow unprivileged users to use394 # mknod(), or only support creating device nodes.395 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))396 else:397 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))398 # Keyword arguments are also supported399 support.unlink(support.TESTFN)400 try:401 posix.mknod(path=support.TESTFN, mode=mode, device=0,402 dir_fd=None)403 except OSError as e:404 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))405 @unittest.skipUnless(hasattr(posix, 'stat'), 'test needs posix.stat()')406 @unittest.skipUnless(hasattr(posix, 'makedev'), 'test needs posix.makedev()')407 def test_makedev(self):408 st = posix.stat(support.TESTFN)409 dev = st.st_dev410 self.assertIsInstance(dev, int)411 self.assertGreaterEqual(dev, 0)412 major = posix.major(dev)413 self.assertIsInstance(major, int)414 self.assertGreaterEqual(major, 0)415 self.assertEqual(posix.major(dev), major)416 self.assertRaises(TypeError, posix.major, float(dev))417 self.assertRaises(TypeError, posix.major)418 self.assertRaises((ValueError, OverflowError), posix.major, -1)419 minor = posix.minor(dev)420 self.assertIsInstance(minor, int)421 self.assertGreaterEqual(minor, 0)422 self.assertEqual(posix.minor(dev), minor)423 self.assertRaises(TypeError, posix.minor, float(dev))424 self.assertRaises(TypeError, posix.minor)425 self.assertRaises((ValueError, OverflowError), posix.minor, -1)426 self.assertEqual(posix.makedev(major, minor), dev)427 self.assertRaises(TypeError, posix.makedev, float(major), minor)428 self.assertRaises(TypeError, posix.makedev, major, float(minor))429 self.assertRaises(TypeError, posix.makedev, major)430 self.assertRaises(TypeError, posix.makedev)431 def _test_all_chown_common(self, chown_func, first_param, stat_func):432 """Common code for chown, fchown and lchown tests."""433 def check_stat(uid, gid):434 if stat_func is not None:435 stat = stat_func(first_param)436 self.assertEqual(stat.st_uid, uid)437 self.assertEqual(stat.st_gid, gid)438 uid = os.getuid()439 gid = os.getgid()440 # test a successful chown call441 chown_func(first_param, uid, gid)442 check_stat(uid, gid)443 chown_func(first_param, -1, gid)444 check_stat(uid, gid)445 chown_func(first_param, uid, -1)446 check_stat(uid, gid)447 if uid == 0:448 # Try an amusingly large uid/gid to make sure we handle449 # large unsigned values. (chown lets you use any450 # uid/gid you like, even if they aren't defined.)451 #452 # This problem keeps coming up:453 # http://bugs.python.org/issue1747858454 # http://bugs.python.org/issue4591455 # http://bugs.python.org/issue15301456 # Hopefully the fix in 4591 fixes it for good!457 #458 # This part of the test only runs when run as root.459 # Only scary people run their tests as root.460 big_value = 2**31461 chown_func(first_param, big_value, big_value)462 check_stat(big_value, big_value)463 chown_func(first_param, -1, -1)464 check_stat(big_value, big_value)465 chown_func(first_param, uid, gid)466 check_stat(uid, gid)467 elif platform.system() in ('HP-UX', 'SunOS'):468 # HP-UX and Solaris can allow a non-root user to chown() to root469 # (issue #5113)470 raise unittest.SkipTest("Skipping because of non-standard chown() "471 "behavior")472 else:473 # non-root cannot chown to root, raises OSError474 self.assertRaises(OSError, chown_func, first_param, 0, 0)475 check_stat(uid, gid)476 self.assertRaises(OSError, chown_func, first_param, 0, -1)477 check_stat(uid, gid)478 if 0 not in os.getgroups():479 self.assertRaises(OSError, chown_func, first_param, -1, 0)480 check_stat(uid, gid)481 # test illegal types482 for t in str, float:483 self.assertRaises(TypeError, chown_func, first_param, t(uid), gid)484 check_stat(uid, gid)485 self.assertRaises(TypeError, chown_func, first_param, uid, t(gid))486 check_stat(uid, gid)487 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")488 def test_chown(self):489 # raise an OSError if the file does not exist490 os.unlink(support.TESTFN)491 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)492 # re-create the file493 support.create_empty_file(support.TESTFN)494 self._test_all_chown_common(posix.chown, support.TESTFN,495 getattr(posix, 'stat', None))496 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")497 def test_fchown(self):498 os.unlink(support.TESTFN)499 # re-create the file500 test_file = open(support.TESTFN, 'w')501 try:502 fd = test_file.fileno()503 self._test_all_chown_common(posix.fchown, fd,504 getattr(posix, 'fstat', None))505 finally:506 test_file.close()507 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")508 def test_lchown(self):509 os.unlink(support.TESTFN)510 # create a symlink511 os.symlink(_DUMMY_SYMLINK, support.TESTFN)512 self._test_all_chown_common(posix.lchown, support.TESTFN,513 getattr(posix, 'lstat', None))514 @unittest.skipUnless(hasattr(posix, 'chdir'), 'test needs posix.chdir()')515 def test_chdir(self):516 posix.chdir(os.curdir)517 self.assertRaises(OSError, posix.chdir, support.TESTFN)518 def test_listdir(self):519 self.assertTrue(support.TESTFN in posix.listdir(os.curdir))520 def test_listdir_default(self):521 # When listdir is called without argument,522 # it's the same as listdir(os.curdir).523 self.assertTrue(support.TESTFN in posix.listdir())524 def test_listdir_bytes(self):525 # When listdir is called with a bytes object,526 # the returned strings are of type bytes.527 self.assertTrue(os.fsencode(support.TESTFN) in posix.listdir(b'.'))528 @unittest.skipUnless(posix.listdir in os.supports_fd,529 "test needs fd support for posix.listdir()")530 def test_listdir_fd(self):531 f = posix.open(posix.getcwd(), posix.O_RDONLY)532 self.addCleanup(posix.close, f)533 self.assertEqual(534 sorted(posix.listdir('.')),535 sorted(posix.listdir(f))536 )537 # Check that the fd offset was reset (issue #13739)538 self.assertEqual(539 sorted(posix.listdir('.')),540 sorted(posix.listdir(f))541 )542 @unittest.skipUnless(hasattr(posix, 'access'), 'test needs posix.access()')543 def test_access(self):544 self.assertTrue(posix.access(support.TESTFN, os.R_OK))545 @unittest.skipUnless(hasattr(posix, 'umask'), 'test needs posix.umask()')546 def test_umask(self):547 old_mask = posix.umask(0)548 self.assertIsInstance(old_mask, int)549 posix.umask(old_mask)550 @unittest.skipUnless(hasattr(posix, 'strerror'),551 'test needs posix.strerror()')552 def test_strerror(self):553 self.assertTrue(posix.strerror(0))554 @unittest.skipUnless(hasattr(posix, 'pipe'), 'test needs posix.pipe()')555 def test_pipe(self):556 reader, writer = posix.pipe()557 os.close(reader)558 os.close(writer)559 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")560 @support.requires_linux_version(2, 6, 27)561 def test_pipe2(self):562 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')563 self.assertRaises(TypeError, os.pipe2, 0, 0)564 # try calling with flags = 0, like os.pipe()565 r, w = os.pipe2(0)566 os.close(r)567 os.close(w)568 # test flags569 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)570 self.addCleanup(os.close, r)571 self.addCleanup(os.close, w)572 self.assertFalse(os.get_inheritable(r))573 self.assertFalse(os.get_inheritable(w))574 self.assertFalse(os.get_blocking(r))575 self.assertFalse(os.get_blocking(w))576 # try reading from an empty pipe: this should fail, not block577 self.assertRaises(OSError, os.read, r, 1)578 # try a write big enough to fill-up the pipe: this should either579 # fail or perform a partial write, not block580 try:581 os.write(w, b'x' * support.PIPE_MAX_SIZE)582 except OSError:583 pass584 @support.cpython_only585 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")586 @support.requires_linux_version(2, 6, 27)587 def test_pipe2_c_limits(self):588 # Issue 15989589 import _testcapi590 self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)591 self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)592 @unittest.skipUnless(hasattr(posix, 'utime'), 'test needs posix.utime()')593 def test_utime(self):594 now = time.time()595 posix.utime(support.TESTFN, None)596 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))597 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))598 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))599 posix.utime(support.TESTFN, (int(now), int(now)))600 posix.utime(support.TESTFN, (now, now))601 def _test_chflags_regular_file(self, chflags_func, target_file, **kwargs):602 st = os.stat(target_file)603 self.assertTrue(hasattr(st, 'st_flags'))604 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.605 flags = st.st_flags | stat.UF_IMMUTABLE606 try:607 chflags_func(target_file, flags, **kwargs)608 except OSError as err:609 if err.errno != errno.EOPNOTSUPP:610 raise611 msg = 'chflag UF_IMMUTABLE not supported by underlying fs'612 self.skipTest(msg)613 try:614 new_st = os.stat(target_file)615 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)616 try:617 fd = open(target_file, 'w+')618 except OSError as e:619 self.assertEqual(e.errno, errno.EPERM)620 finally:621 posix.chflags(target_file, st.st_flags)622 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')623 def test_chflags(self):624 self._test_chflags_regular_file(posix.chflags, support.TESTFN)625 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')626 def test_lchflags_regular_file(self):627 self._test_chflags_regular_file(posix.lchflags, support.TESTFN)628 self._test_chflags_regular_file(posix.chflags, support.TESTFN, follow_symlinks=False)629 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')630 def test_lchflags_symlink(self):631 testfn_st = os.stat(support.TESTFN)632 self.assertTrue(hasattr(testfn_st, 'st_flags'))633 os.symlink(support.TESTFN, _DUMMY_SYMLINK)634 self.teardown_files.append(_DUMMY_SYMLINK)635 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)636 def chflags_nofollow(path, flags):637 return posix.chflags(path, flags, follow_symlinks=False)638 for fn in (posix.lchflags, chflags_nofollow):639 # ZFS returns EOPNOTSUPP when attempting to set flag UF_IMMUTABLE.640 flags = dummy_symlink_st.st_flags | stat.UF_IMMUTABLE641 try:642 fn(_DUMMY_SYMLINK, flags)643 except OSError as err:644 if err.errno != errno.EOPNOTSUPP:645 raise646 msg = 'chflag UF_IMMUTABLE not supported by underlying fs'647 self.skipTest(msg)648 try:649 new_testfn_st = os.stat(support.TESTFN)650 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)651 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)652 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,653 new_dummy_symlink_st.st_flags)654 finally:655 fn(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)656 def test_environ(self):657 if os.name == "nt":658 item_type = str659 else:660 item_type = bytes661 for k, v in posix.environ.items():662 self.assertEqual(type(k), item_type)663 self.assertEqual(type(v), item_type)664 @unittest.skipUnless(hasattr(posix, 'getcwd'), 'test needs posix.getcwd()')665 def test_getcwd_long_pathnames(self):666 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'667 curdir = os.getcwd()668 base_path = os.path.abspath(support.TESTFN) + '.getcwd'669 try:670 os.mkdir(base_path)671 os.chdir(base_path)672 except:673 # Just returning nothing instead of the SkipTest exception, because674 # the test results in Error in that case. Is that ok?675 # raise unittest.SkipTest("cannot create directory for testing")676 return677 def _create_and_do_getcwd(dirname, current_path_length = 0):678 try:679 os.mkdir(dirname)680 except:681 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")682 os.chdir(dirname)683 try:684 os.getcwd()685 if current_path_length < 1027:686 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)687 finally:688 os.chdir('..')689 os.rmdir(dirname)690 _create_and_do_getcwd(dirname)691 finally:692 os.chdir(curdir)693 support.rmtree(base_path)694 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")695 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")696 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")697 def test_getgrouplist(self):698 user = pwd.getpwuid(os.getuid())[0]699 group = pwd.getpwuid(os.getuid())[3]700 self.assertIn(group, posix.getgrouplist(user, group))701 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")702 def test_getgroups(self):703 with os.popen('id -G 2>/dev/null') as idg:704 groups = idg.read().strip()705 ret = idg.close()706 try:707 idg_groups = set(int(g) for g in groups.split())708 except ValueError:709 idg_groups = set()710 if ret is not None or not idg_groups:711 raise unittest.SkipTest("need working 'id -G'")712 # Issues 16698: OS X ABIs prior to 10.6 have limits on getgroups()713 if sys.platform == 'darwin':714 import sysconfig715 dt = sysconfig.get_config_var('MACOSX_DEPLOYMENT_TARGET') or '10.0'716 if tuple(int(n) for n in dt.split('.')[0:2]) < (10, 6):717 raise unittest.SkipTest("getgroups(2) is broken prior to 10.6")718 # 'id -G' and 'os.getgroups()' should return the same719 # groups, ignoring order, duplicates, and the effective gid.720 # #10822/#26944 - It is implementation defined whether721 # posix.getgroups() includes the effective gid.722 symdiff = idg_groups.symmetric_difference(posix.getgroups())723 self.assertTrue(not symdiff or symdiff == {posix.getegid()})724 # tests for the posix *at functions follow725 @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")726 def test_access_dir_fd(self):727 f = posix.open(posix.getcwd(), posix.O_RDONLY)728 try:729 self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))730 finally:731 posix.close(f)732 @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")733 def test_chmod_dir_fd(self):734 os.chmod(support.TESTFN, stat.S_IRUSR)735 f = posix.open(posix.getcwd(), posix.O_RDONLY)736 try:737 posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)738 s = posix.stat(support.TESTFN)739 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)740 finally:741 posix.close(f)742 @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")743 def test_chown_dir_fd(self):744 support.unlink(support.TESTFN)745 support.create_empty_file(support.TESTFN)746 f = posix.open(posix.getcwd(), posix.O_RDONLY)747 try:748 posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)749 finally:750 posix.close(f)751 @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")752 def test_stat_dir_fd(self):753 support.unlink(support.TESTFN)754 with open(support.TESTFN, 'w') as outfile:755 outfile.write("testline\n")756 f = posix.open(posix.getcwd(), posix.O_RDONLY)757 try:758 s1 = posix.stat(support.TESTFN)759 s2 = posix.stat(support.TESTFN, dir_fd=f)760 self.assertEqual(s1, s2)761 s2 = posix.stat(support.TESTFN, dir_fd=None)762 self.assertEqual(s1, s2)763 self.assertRaisesRegex(TypeError, 'should be integer or None, not',764 posix.stat, support.TESTFN, dir_fd=posix.getcwd())765 self.assertRaisesRegex(TypeError, 'should be integer or None, not',766 posix.stat, support.TESTFN, dir_fd=float(f))767 self.assertRaises(OverflowError,768 posix.stat, support.TESTFN, dir_fd=10**20)769 finally:770 posix.close(f)771 @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")772 def test_utime_dir_fd(self):773 f = posix.open(posix.getcwd(), posix.O_RDONLY)774 try:775 now = time.time()776 posix.utime(support.TESTFN, None, dir_fd=f)777 posix.utime(support.TESTFN, dir_fd=f)778 self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)779 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)780 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)781 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)782 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)783 posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)784 posix.utime(support.TESTFN, (now, now), dir_fd=f)785 posix.utime(support.TESTFN,786 (int(now), int((now - int(now)) * 1e9)), dir_fd=f)787 posix.utime(support.TESTFN, dir_fd=f,788 times=(int(now), int((now - int(now)) * 1e9)))789 # try dir_fd and follow_symlinks together790 if os.utime in os.supports_follow_symlinks:791 try:792 posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)793 except ValueError:794 # whoops! using both together not supported on this platform.795 pass796 finally:797 posix.close(f)798 @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")799 @unittest.skipIf(android_not_root, "hard link not allowed, non root user")800 def test_link_dir_fd(self):801 f = posix.open(posix.getcwd(), posix.O_RDONLY)802 try:803 posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)804 # should have same inodes805 self.assertEqual(posix.stat(support.TESTFN)[1],806 posix.stat(support.TESTFN + 'link')[1])807 finally:808 posix.close(f)809 support.unlink(support.TESTFN + 'link')810 @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")811 def test_mkdir_dir_fd(self):812 f = posix.open(posix.getcwd(), posix.O_RDONLY)813 try:814 posix.mkdir(support.TESTFN + 'dir', dir_fd=f)815 posix.stat(support.TESTFN + 'dir') # should not raise exception816 finally:817 posix.close(f)818 support.rmtree(support.TESTFN + 'dir')819 @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),820 "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")821 @unittest.skipIf(android_not_root, "mknod not allowed, non root user")822 def test_mknod_dir_fd(self):823 # Test using mknodat() to create a FIFO (the only use specified824 # by POSIX).825 support.unlink(support.TESTFN)826 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR827 f = posix.open(posix.getcwd(), posix.O_RDONLY)828 try:829 posix.mknod(support.TESTFN, mode, 0, dir_fd=f)830 except OSError as e:831 # Some old systems don't allow unprivileged users to use832 # mknod(), or only support creating device nodes.833 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))834 else:835 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))836 finally:837 posix.close(f)838 @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")839 def test_open_dir_fd(self):840 support.unlink(support.TESTFN)841 with open(support.TESTFN, 'w') as outfile:842 outfile.write("testline\n")843 a = posix.open(posix.getcwd(), posix.O_RDONLY)844 b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)845 try:846 res = posix.read(b, 9).decode(encoding="utf-8")847 self.assertEqual("testline\n", res)848 finally:849 posix.close(a)850 posix.close(b)851 @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")852 def test_readlink_dir_fd(self):853 os.symlink(support.TESTFN, support.TESTFN + 'link')854 f = posix.open(posix.getcwd(), posix.O_RDONLY)855 try:856 self.assertEqual(posix.readlink(support.TESTFN + 'link'),857 posix.readlink(support.TESTFN + 'link', dir_fd=f))858 finally:859 support.unlink(support.TESTFN + 'link')860 posix.close(f)861 @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")862 def test_rename_dir_fd(self):863 support.unlink(support.TESTFN)864 support.create_empty_file(support.TESTFN + 'ren')865 f = posix.open(posix.getcwd(), posix.O_RDONLY)866 try:867 posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)868 except:869 posix.rename(support.TESTFN + 'ren', support.TESTFN)870 raise871 else:872 posix.stat(support.TESTFN) # should not raise exception873 finally:874 posix.close(f)875 @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")876 def test_symlink_dir_fd(self):877 f = posix.open(posix.getcwd(), posix.O_RDONLY)878 try:879 posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)880 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)881 finally:882 posix.close(f)883 support.unlink(support.TESTFN + 'link')884 @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")885 def test_unlink_dir_fd(self):886 f = posix.open(posix.getcwd(), posix.O_RDONLY)887 support.create_empty_file(support.TESTFN + 'del')888 posix.stat(support.TESTFN + 'del') # should not raise exception889 try:890 posix.unlink(support.TESTFN + 'del', dir_fd=f)891 except:892 support.unlink(support.TESTFN + 'del')893 raise894 else:895 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')896 finally:897 posix.close(f)898 @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")899 @unittest.skipIf(android_not_root, "mkfifo not allowed, non root user")900 def test_mkfifo_dir_fd(self):901 support.unlink(support.TESTFN)902 f = posix.open(posix.getcwd(), posix.O_RDONLY)903 try:904 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)905 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))906 finally:907 posix.close(f)908 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),909 "don't have scheduling support")910 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),911 "don't have sched affinity support")912 @requires_sched_h913 def test_sched_yield(self):914 # This has no error conditions (at least on Linux).915 posix.sched_yield()916 @requires_sched_h917 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),918 "requires sched_get_priority_max()")919 def test_sched_priority(self):920 # Round-robin usually has interesting priorities.921 pol = posix.SCHED_RR922 lo = posix.sched_get_priority_min(pol)923 hi = posix.sched_get_priority_max(pol)924 self.assertIsInstance(lo, int)925 self.assertIsInstance(hi, int)926 self.assertGreaterEqual(hi, lo)927 # OSX evidently just returns 15 without checking the argument.928 if sys.platform != "darwin":929 self.assertRaises(OSError, posix.sched_get_priority_min, -23)930 self.assertRaises(OSError, posix.sched_get_priority_max, -23)931 @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")932 def test_get_and_set_scheduler_and_param(self):933 possible_schedulers = [sched for name, sched in posix.__dict__.items()934 if name.startswith("SCHED_")]935 mine = posix.sched_getscheduler(0)936 self.assertIn(mine, possible_schedulers)937 try:938 parent = posix.sched_getscheduler(os.getppid())939 except OSError as e:940 if e.errno != errno.EPERM:941 raise942 else:943 self.assertIn(parent, possible_schedulers)944 self.assertRaises(OSError, posix.sched_getscheduler, -1)945 self.assertRaises(OSError, posix.sched_getparam, -1)946 param = posix.sched_getparam(0)947 self.assertIsInstance(param.sched_priority, int)948 # POSIX states that calling sched_setparam() or sched_setscheduler() on949 # a process with a scheduling policy other than SCHED_FIFO or SCHED_RR950 # is implementation-defined: NetBSD and FreeBSD can return EINVAL.951 if not sys.platform.startswith(('freebsd', 'netbsd')):952 try:953 posix.sched_setscheduler(0, mine, param)954 posix.sched_setparam(0, param)955 except OSError as e:956 if e.errno != errno.EPERM:957 raise958 self.assertRaises(OSError, posix.sched_setparam, -1, param)959 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)960 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)961 self.assertRaises(TypeError, posix.sched_setparam, 0, 43)962 param = posix.sched_param(None)963 self.assertRaises(TypeError, posix.sched_setparam, 0, param)964 large = 214748364700965 param = posix.sched_param(large)966 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)967 param = posix.sched_param(sched_priority=-large)968 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)969 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")970 def test_sched_rr_get_interval(self):971 try:972 interval = posix.sched_rr_get_interval(0)973 except OSError as e:974 # This likely means that sched_rr_get_interval is only valid for975 # processes with the SCHED_RR scheduler in effect.976 if e.errno != errno.EINVAL:977 raise978 self.skipTest("only works on SCHED_RR processes")979 self.assertIsInstance(interval, float)980 # Reasonable constraints, I think.981 self.assertGreaterEqual(interval, 0.)982 self.assertLess(interval, 1.)983 @requires_sched_affinity984 def test_sched_getaffinity(self):985 mask = posix.sched_getaffinity(0)986 self.assertIsInstance(mask, set)987 self.assertGreaterEqual(len(mask), 1)988 self.assertRaises(OSError, posix.sched_getaffinity, -1)989 for cpu in mask:990 self.assertIsInstance(cpu, int)991 self.assertGreaterEqual(cpu, 0)992 self.assertLess(cpu, 1 << 32)993 @requires_sched_affinity994 def test_sched_setaffinity(self):995 mask = posix.sched_getaffinity(0)996 if len(mask) > 1:997 # Empty masks are forbidden998 mask.pop()999 posix.sched_setaffinity(0, mask)1000 self.assertEqual(posix.sched_getaffinity(0), mask)1001 self.assertRaises(OSError, posix.sched_setaffinity, 0, [])1002 self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])1003 self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])1004 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)1005 def test_rtld_constants(self):1006 # check presence of major RTLD_* constants1007 posix.RTLD_LAZY1008 posix.RTLD_NOW1009 posix.RTLD_GLOBAL1010 posix.RTLD_LOCAL1011 @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),1012 "test needs an OS that reports file holes")1013 def test_fs_holes(self):1014 # Even if the filesystem doesn't report holes,1015 # if the OS supports it the SEEK_* constants1016 # will be defined and will have a consistent1017 # behaviour:1018 # os.SEEK_DATA = current position1019 # os.SEEK_HOLE = end of file position1020 with open(support.TESTFN, 'r+b') as fp:1021 fp.write(b"hello")1022 fp.flush()1023 size = fp.tell()1024 fno = fp.fileno()1025 try :1026 for i in range(size):1027 self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))1028 self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))1029 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)1030 self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)1031 except OSError :1032 # Some OSs claim to support SEEK_HOLE/SEEK_DATA1033 # but it is not true.1034 # For instance:1035 # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html1036 raise unittest.SkipTest("OSError raised!")1037 def test_path_error2(self):1038 """1039 Test functions that call path_error2(), providing two filenames in their exceptions.1040 """1041 for name in ("rename", "replace", "link"):1042 function = getattr(os, name, None)1043 if function is None:1044 continue1045 for dst in ("noodly2", support.TESTFN):1046 try:1047 function('doesnotexistfilename', dst)1048 except OSError as e:1049 self.assertIn("'doesnotexistfilename' -> '{}'".format(dst), str(e))1050 break1051 else:1052 self.fail("No valid path_error2() test for os." + name)1053 def test_path_with_null_character(self):1054 fn = support.TESTFN1055 fn_with_NUL = fn + '\0'1056 self.addCleanup(support.unlink, fn)1057 support.unlink(fn)1058 fd = None1059 try:1060 with self.assertRaises(ValueError):1061 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises1062 finally:1063 if fd is not None:1064 os.close(fd)1065 self.assertFalse(os.path.exists(fn))1066 self.assertRaises(ValueError, os.mkdir, fn_with_NUL)1067 self.assertFalse(os.path.exists(fn))1068 open(fn, 'wb').close()1069 self.assertRaises(ValueError, os.stat, fn_with_NUL)1070 def test_path_with_null_byte(self):1071 fn = os.fsencode(support.TESTFN)1072 fn_with_NUL = fn + b'\0'1073 self.addCleanup(support.unlink, fn)1074 support.unlink(fn)1075 fd = None1076 try:1077 with self.assertRaises(ValueError):1078 fd = os.open(fn_with_NUL, os.O_WRONLY | os.O_CREAT) # raises1079 finally:1080 if fd is not None:1081 os.close(fd)1082 self.assertFalse(os.path.exists(fn))1083 self.assertRaises(ValueError, os.mkdir, fn_with_NUL)1084 self.assertFalse(os.path.exists(fn))1085 open(fn, 'wb').close()1086 self.assertRaises(ValueError, os.stat, fn_with_NUL)1087class PosixGroupsTester(unittest.TestCase):1088 def setUp(self):1089 if posix.getuid() != 0:1090 raise unittest.SkipTest("not enough privileges")1091 if not hasattr(posix, 'getgroups'):1092 raise unittest.SkipTest("need posix.getgroups")1093 if sys.platform == 'darwin':1094 raise unittest.SkipTest("getgroups(2) is broken on OSX")1095 self.saved_groups = posix.getgroups()1096 def tearDown(self):1097 if hasattr(posix, 'setgroups'):1098 posix.setgroups(self.saved_groups)1099 elif hasattr(posix, 'initgroups'):1100 name = pwd.getpwuid(posix.getuid()).pw_name1101 posix.initgroups(name, self.saved_groups[0])1102 @unittest.skipUnless(hasattr(posix, 'initgroups'),1103 "test needs posix.initgroups()")1104 def test_initgroups(self):1105 # find missing group1106 g = max(self.saved_groups or [0]) + 11107 name = pwd.getpwuid(posix.getuid()).pw_name1108 posix.initgroups(name, g)1109 self.assertIn(g, posix.getgroups())1110 @unittest.skipUnless(hasattr(posix, 'setgroups'),1111 "test needs posix.setgroups()")1112 def test_setgroups(self):1113 for groups in [[0], list(range(16))]:1114 posix.setgroups(groups)1115 self.assertListEqual(groups, posix.getgroups())1116def test_main():1117 try:1118 support.run_unittest(PosixTester, PosixGroupsTester)1119 finally:1120 support.reap_children()1121if __name__ == '__main__':...

Full Screen

Full Screen

test_match.py

Source:test_match.py Github

copy

Full Screen

...56 matched_identical3 = match.Match(med2a, med2, 0.4)57 matched_potential4 = match.Match(med1b, med2, 0.5)58 matched_potential4_rev = match.Match(med2, med1b, 0.5)59 matched_identical4 = match.Match(med2a, med2, 0.5)60 @unittest.skipUnless(test_objects, 'missing test_match data')61 def test_potential_match_eq1(self):62 "Test that a newly-instantiated potential match is equivalent to our baseline."63 self.assertEqual(self.matched_potential1, self.test_objects['matched_potential'])64 @unittest.skipUnless(test_objects, 'missing test_match data')65 def test_potential_match_eq2(self):66 "Test that a newly-instantiated potential match with meds in reverse order is equivalent to our baseline."67 self.assertEqual(self.matched_potential2, self.test_objects['matched_potential'])68 69 @unittest.skipUnless(test_objects, 'missing test_match data')70 def test_potential_match_ne1(self):71 "Test that a newly-instantiated potential match is not equivalent to our baseline (certainty differs)."72 self.assertNotEqual(self.matched_potential3, self.test_objects['matched_potential'])73 74 @unittest.skipUnless(test_objects, 'missing test_match data')75 def test_potential_match_ne2(self):76 "Test that a newly-instantiated potential match is not equivalent to our baseline (dosage differs)."77 self.assertNotEqual(self.matched_potential4, self.test_objects['matched_potential'])78 79 @unittest.skipUnless(test_objects, 'missing test_match data')80 def test_potential_match_ne3(self):81 "Test that a newly-instantiated potential match with meds in reverse order is not equivalent to our baseline."82 self.assertNotEqual(self.matched_potential4_rev, self.test_objects['matched_potential'])83 84 @unittest.skipUnless(test_objects, 'missing test_match data')85 def test_identical_match_eq1(self):86 "Test that a newly-instantiated identical match is equivalent to our baseline."87 self.assertEqual(self.matched_identical1, self.test_objects['matched_identical'])88 @unittest.skipUnless(test_objects, 'missing test_match data')89 def test_identical_match_eq2(self):90 "Test that a newly-instantiated identical match with meds in reverse order is equivalent to our baseline."91 self.assertEqual(self.matched_identical2, self.test_objects['matched_identical'])92 @unittest.skipUnless(test_objects, 'missing test_match data')93 def test_by_string_dictionary(self):94 "Test that the dictionary from a by-string match is as we expect."95 self.assertEqual(rmIdsFromMatchDict(self.matched_by_string.as_dictionary()),96 rmIdsFromMatchDict(self.test_objects['matched_by_string'].as_dictionary()))97 98 @unittest.skipUnless(test_objects, 'missing test_match data')99 def test_by_brand_name_dictionary(self):100 "Test that the dictionary from a by-brand-name match is as we expect."101 self.assertEqual(rmIdsFromMatchDict(self.matched_by_brand_name.as_dictionary()),102 rmIdsFromMatchDict(self.test_objects['matched_by_brand_name'].as_dictionary()))103 @unittest.skipUnless(test_objects, 'missing test_match data')104 def test_by_ingredients_dictionary(self):105 "Test that the dictionary from a by-ingredients match is as we expect."106 self.assertEqual(rmIdsFromMatchDict(self.matched_by_ingredients.as_dictionary()),107 rmIdsFromMatchDict(self.test_objects['matched_by_ingredients'].as_dictionary()))108 109 @unittest.skipUnless(test_objects, 'missing test_match data')110 def test_by_treatment_dictionary(self):111 "Test that the dictionary from a by-treatment match is as we expect."112 self.assertEqual(rmIdsFromMatchDict(self.matched_by_treatment.as_dictionary()),113 rmIdsFromMatchDict(self.test_objects['matched_by_treatment'].as_dictionary()))114 115 @unittest.skipUnless(test_objects, 'missing test_match data')116 def test_unspecified_dictionary(self):117 "Test that a dictionary from a match by unspecified mechanism is as we expect."118 self.assertEqual(rmIdsFromMatchDict(self.matched_unspecified.as_dictionary()),119 rmIdsFromMatchDict(self.test_objects['matched_unspecified'].as_dictionary()))120 121class TestFunctions(unittest.TestCase):122 """A set of unit tests to exercise the functions in the 'match' module. 123 """124 medString1 = 'Lisinopril 5 MG Tablet;TAKE TABLET TWICE DAILY; Rx'125 pMed1 = ParsedMedication(medString1, mappings)126 pMed1CUIs = set(['C0065374'])127 pMed1Tradenames = ['C0591228', 'C0591229', 'C0678140', 'C0701176', 'C0722805', 'C1602677', 'C2368718', 'C2368722', 'C2368725']128 medString2 = 'Pramipexole 0.5 MG Tablet;TAKE 1 TABLET 3 TIMES DAILY.; Rx'129 pMed2 = ParsedMedication(medString2, mappings)130 pMed2CUIs = set(['C0074710'])131 pMed2Tradenames = ['C0721754']132 medString2a = 'PRAMIPEXOLE 0.5 MG TABLET;take 1 tablet 3 times daily.; rx'133 pMed2a = ParsedMedication(medString2a, mappings)134 pMed2aCUIs = set(['C0074710'])135 pMed2aTradenames = ['C0721754']136 medString2b = 'Mirapex 0.5 MG Tablet;TAKE 1 TABLET 3 TIMES DAILY.; Rx'137 pMed2b = ParsedMedication(medString2b, mappings)138 pMed2bCUIs = set(['C0721754'])139 pMed2bTradenames = []140 medString3 = 'Warfarin Sodium 2.5 MG Tablet;TAKE AS DIRECTED.; Rx'141 pMed3 = ParsedMedication(medString3, mappings)142 pMed3CUIs = set(['C0376218'])143 pMed3Tradenames = []144 medString4 = 'Protonix 40 MG Tablet Delayed Release;TAKE 1 TABLET DAILY.; Rx'145 pMed4 = ParsedMedication(medString4, mappings)146 medString5 = 'Pantoprazole Sodium 40 MG Tablet Delayed Release;TAKE 1 TABLET DAILY.; Rx'147 pMed5 = ParsedMedication(medString5, mappings)148 medString6 = 'Paroxetine 20 MG Tablet; TAKE 1 TABLET DAILY.; Rx'149 pMed6 = ParsedMedication(medString6, mappings)150 medString7 = 'Sertraline 50 MG Tablet;TAKE 1 TABLET BY MOUTH EVERY DAY; Rx'151 pMed7 = ParsedMedication(medString7, mappings)152 medString8 = 'Razadyne 16 MG Capsule Extended Release 24 Hour;TAKE 1 CAPSULE DAILY IN THE MORNING take with meal daily; Rx.'153 pMed8 = ParsedMedication(medString8, mappings)154 medString9 = 'Cyclandelate 16 MG Capsule Extended Release 24 Hour;TAKE 1 CAPSULE DAILY IN THE MORNING take with meal daily; Rx.'155 pMed9 = ParsedMedication(medString9, mappings)156 medString10 = 'docosahexaenoic acid 200 mg capsules; one cap BID'157 pMed10 = ParsedMedication(medString10, mappings)158 medString11 = 'Exelon 4.6 MG/24HR Transdermal Patch 24 Hour;APPLY 1 PATCH DAILY AS DIRECTED.; Rx'159 pMed11 = ParsedMedication(medString11, mappings)160 list1 = [pMed1, pMed2]161 list1rev = [pMed2, pMed1]162 list2 = [pMed2a, pMed3]163 list2rev = [pMed3, pMed2a]164 list3 = [pMed2b, pMed3]165 list3rev = [pMed3, pMed2b]166 medication_list_test_CUIs = [pMed1CUIs, pMed2CUIs, pMed2aCUIs, pMed3CUIs, pMed2bCUIs, pMed3CUIs]167 medication_list_test_tradenames = [pMed1Tradenames, pMed2Tradenames, pMed2aTradenames, pMed3Tradenames, pMed2bTradenames, pMed3Tradenames]168 test_objects = None169 if test_match_objects:170 test_objects = test_match_objects['TestFunctions']171 matched_by_strings = match.match_by_strings(list1, list2)172 matched_by_strings_rev = match.match_by_strings(list1rev, list2rev)173 if mappings:174 matched_by_brand_name1 = match.match_by_brand_name(list1, list3)175 matched_by_brand_name1_rev = match.match_by_brand_name(list1rev, list3rev)176 matched_by_brand_name2 = match.match_by_brand_name(list3, list1)177 matched_by_ingredients_above = match.match_by_ingredients([pMed4], [pMed5], min_match_threshold=0.6)178 matched_by_ingredients_below = match.match_by_ingredients([pMed4], [pMed5], min_match_threshold=0.7)179 matched_by_ingredients_rev_above = match.match_by_ingredients([pMed5], [pMed4], min_match_threshold=0.6)180 matched_by_ingredients_rev_below = match.match_by_ingredients([pMed5], [pMed4], min_match_threshold=0.7)181 if mappings.treatment:182 matched_by_treatment_above = match.match_by_treatment([pMed6], [pMed7], mappings, match_acceptance_threshold=0.3)183 matched_by_treatment_below = match.match_by_treatment([pMed6], [pMed7], mappings)184 matched_by_treatment_05_yes = match.match_by_treatment([pMed8], [pMed9], mappings, match_acceptance_threshold=0.5)185 matched_by_treatment_05_no = match.match_by_treatment([pMed8], [pMed9], mappings, match_acceptance_threshold=0.51)186 matched_by_treatment_04_yes = match.match_by_treatment([pMed10], [pMed11], mappings, match_acceptance_threshold=0.4)187 matched_by_treatment_04_no = match.match_by_treatment([pMed10], [pMed11], mappings, match_acceptance_threshold=0.43)188 # Use the demo lists for testing; this code was previously in TestMatchResult189 demo_list_1 = [pm for pm in190 [make_medication(x, mappings, "List 1") for x in constants.demo_list_1]191 if isinstance(pm, ParsedMedication)]192 demo_list_2 = [pm for pm in193 [make_medication(x, mappings, "List 2") for x in constants.demo_list_2]194 if isinstance(pm, ParsedMedication)]195 demo_matched_by_strings = match.match_by_strings(demo_list_1, demo_list_2)196 demo_matched_by_strings_rev = match.match_by_strings(demo_list_2, demo_list_1)197 if mappings:198 demo_matched_by_brand_name = match.match_by_brand_name(demo_list_1, demo_list_2)199 demo_matched_by_brand_name_rev = match.match_by_brand_name(demo_list_2, demo_list_1)200 demo_matched_by_ingredients = match.match_by_ingredients(demo_list_1, demo_list_2)201 demo_matched_by_ingredients_rev = match.match_by_ingredients(demo_list_2, demo_list_1)202 @unittest.skipUnless(test_objects, 'missing test_match data')203 def test_match_by_strings(self):204 "Test that the MatchResult from a by-string match contains the lists we expect."205 self.assertEqual(self.matched_by_strings, self.test_objects['matched_by_strings'])206 @unittest.skipUnless(test_objects, 'missing test_match data')207 def test_match_by_strings_rev(self):208 """Test that the MatchResult from a by-string match is order-independent209 with respect to the order the medication lists are passed in."""210 self.assertEqual(self.matched_by_strings_rev, self.test_objects['matched_by_strings_rev'])211 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')212 def test_medication_list_CUIs(self):213 "Test the operation of match.medication_list_CUIs()"214 cuis = match.medication_list_CUIs(self.list1 + self.list2 + self.list3)215 self.assertEqual(cuis, self.medication_list_test_CUIs)216 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')217 @unittest.skipUnless(test_objects, 'missing test_match data')218 def test_medication_list_tradenames(self):219 "Test the operation of match.medication_list_tradenames()"220 tradenames = match.medication_list_tradenames(self.list1 + self.list2 + self.list3)221 self.assertEqual(tradenames, self.medication_list_test_tradenames)222 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')223 @unittest.skipUnless(test_objects, 'missing test_match data')224 def test_match_by_brand_name1(self):225 """Test that the MatchResult from a by-brand-name match contains the 226 lists we expect."""227 self.assertEqual(self.matched_by_brand_name1, self.test_objects['matched_by_brand_name1'])228 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')229 @unittest.skipUnless(test_objects, 'missing test_match data')230 def test_match_by_brand_name1_rev(self):231 """Test that the MatchResult from a by-brand-name match is order-independent232 with respect to the order the medication lists are passed in."""233 self.assertEqual(self.matched_by_brand_name1_rev, self.test_objects['matched_by_brand_name1_rev'])234 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')235 @unittest.skipUnless(test_objects, 'missing test_match data')236 def test_match_by_brand_name2(self):237 """Test that the MatchResult from a by-brand-name match contains the238 lists we expect."""239 self.assertEqual(self.matched_by_brand_name2, self.test_objects['matched_by_brand_name2'])240 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')241 @unittest.skipUnless(test_objects, 'missing test_match data')242 def test_match_by_ingredients_above(self):243 """Test reconcilation of medications by treatment intent that should244 match at a threshold of 0.6."""245 self.assertEqual(self.matched_by_ingredients_above, self.test_objects['matched_by_ingredients_above'])246 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')247 @unittest.skipUnless(test_objects, 'missing test_match data')248 def test_match_by_ingredients_below(self):249 """Test reconcilation of medications by treatment intent that should250 not match at a threshold of 0.7."""251 self.assertEqual(self.matched_by_ingredients_below, self.test_objects['matched_by_ingredients_below'])252 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')253 @unittest.skipUnless(test_objects, 'missing test_match data')254 def test_match_by_ingredients_rev_above(self):255 """Test order independence of the reconcilation of medications by 256 treatment intent that should match at a threshold of 0.6."""257 self.assertEqual(self.matched_by_ingredients_rev_above, self.test_objects['matched_by_ingredients_rev_above'])258 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')259 @unittest.skipUnless(test_objects, 'missing test_match data')260 def test_match_by_ingredients_rev_below(self):261 """Test order independence of the reconcilation of medications by 262 treatment intent that should not match at a threshold of 0.7."""263 self.assertEqual(self.matched_by_ingredients_rev_below, self.test_objects['matched_by_ingredients_rev_below'])264 @unittest.skipUnless(test_objects, 'missing test_objects data')265 def test_demo_match_by_strings(self):266 """Use demo lists to test matching by strings."""267 self.assertEqual(self.demo_matched_by_strings, self.test_objects['demo_matched_by_strings'])268 @unittest.skipUnless(test_objects, 'missing test_objects data')269 def test_demo_match_by_strings_rev(self):270 """Use demo lists to test order independence of matching by strings."""271 self.assertEqual(self.demo_matched_by_strings_rev, self.test_objects['demo_matched_by_strings_rev'])272 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')273 @unittest.skipUnless(mappings and test_objects, 'missing MappingContext with RXNORM data')274 def test_demo_match_by_brand_name(self):275 """Use demo lists to test matching by brand names."""276 self.assertEqual(self.demo_matched_by_brand_name, self.test_objects['demo_matched_by_brand_name'])277 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')278 @unittest.skipUnless(mappings and test_objects, 'missing MappingContext with RXNORM data')279 def test_demo_match_by_brand_name_rev(self):280 """Use demo lists to test order independence of matching by brand names."""281 self.assertEqual(self.demo_matched_by_brand_name_rev, self.test_objects['demo_matched_by_brand_name_rev'])282 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')283 @unittest.skipUnless(mappings and test_objects, 'missing MappingContext with RXNORM data')284 def test_demo_match_by_ingredients_list(self):285 """Use demo lists to test matching by ingredients."""286 self.assertEqual(self.demo_matched_by_ingredients, self.test_objects['demo_matched_by_ingredients'])287 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')288 @unittest.skipUnless(test_objects, 'missing test_match data')289 def test_demo_match_by_ingredients_list_rev(self):290 """Use demo lists to test order independence of matching by ingredients."""291 self.assertEqual(self.demo_matched_by_ingredients_rev, self.test_objects['demo_matched_by_ingredients_rev'])292 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')293 @unittest.skipUnless(mappings and mappings.treatment, 'MappingContext lacks treatment data')294 def test_match_by_treatment_above(self):295 """These two medications should match by treatment if the 296 match_acceptance_threshold is set to 0.3; note that this297 behavior may change as the underlying 'treats' data change."""298 self.assertEqual(len(self.matched_by_treatment_above.reconciled), 1)299 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')300 @unittest.skipUnless(mappings and mappings.treatment, 'MappingContext lacks treatment data')301 def test_match_by_treatment_below(self):302 """These two medications should not match by treatment if the303 match_acceptance_threshold is set to default (0.5); note that this304 behavior may change as the underlying 'treats' data change."""305 self.assertEqual(len(self.matched_by_treatment_below.reconciled), 0)306 @unittest.skipUnless(mappings, 'missing MappingContext with RXNORM data')307 @unittest.skipUnless(mappings and mappings.treatment, 'MappingContext lacks treatment data')308 def test_match_by_treatment_varies(self):309 """Test matching by treatment intent, varying thresholds to induce310 matches and non-matches on the same two sets of medication lists.311 """312 self.assertEqual(len(self.matched_by_treatment_05_yes.reconciled), 1)313 self.assertEqual(len(self.matched_by_treatment_05_no.reconciled), 0)314 self.assertEqual(len(self.matched_by_treatment_04_yes.reconciled), 1)315 self.assertEqual(len(self.matched_by_treatment_04_no.reconciled), 0)316class TestMatchResult(unittest.TestCase):317 """A set of unit tests to exercise the match.Match class. The MatchResult 318 class gets exercised a lot above, so we'll implement only a basic test319 of the members of the class.320 """321 med1 = ParsedMedication(TestFunctions.medString1, mappings)322 med2a = ParsedMedication(TestFunctions.medString2, mappings)323 med2b = ParsedMedication(TestFunctions.medString2, mappings)324 med3 = ParsedMedication(TestFunctions.medString3, mappings)325 rec_med = match.Match(med2a, med2b)326 basic_match_result = match.MatchResult([med1], [med3], [rec_med])327 test_objects = None328 if test_match_objects:329 test_objects = test_match_objects['TestMatchResult']330 @unittest.skipUnless(test_objects, 'missing test_match data')331 def test_basic(self):332 "Basic test of MatchResult functionality."333 self.assertEqual(self.basic_match_result, self.test_objects['basic_match_result'])334loader = unittest.TestLoader()335allTestsSuite = unittest.TestSuite()336allTestsSuite.addTests(loader.loadTestsFromTestCase(TestMatch))337allTestsSuite.addTests(loader.loadTestsFromTestCase(TestFunctions))338allTestsSuite.addTests(loader.loadTestsFromTestCase(TestMatchResult))339if __name__ == "__main__":...

Full Screen

Full Screen

test_connect_api.py

Source:test_connect_api.py Github

copy

Full Screen

...15class ConnectApiClientTest(unittest.TestCase):16 _multiprocess_can_split_ = True17 expected_riskrule_keys = set(['name', 'description', 'count',18 'criticality', 'criticalityLabel'])19 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)20 def test_domain_search(self):21 client = ConnectApiClient()22 resp = client.search_domains()23 self.assertIsInstance(resp, ConnectApiResponse)24 self.assertIsInstance(resp.entities, types.GeneratorType)25 first_entity = next(resp.entities)26 self.assertIsInstance(first_entity, DotAccessDict)27 self.assertEquals(first_entity.id, first_entity.entity.id)28 self.assertIsInstance(resp.returned_count, int)29 self.assertIsInstance(resp.total_count, long)30 self.assertGreater(resp.returned_count, 0)31 def check_list(self, resp):32 self.assertIsInstance(resp, list)33 keys = set(resp[0].keys())34 for e in self.expected_riskrule_keys:35 self.assertIn(e, keys, "Evidence should contain the key=%s" % e)36 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)37 def test_ip_riskrule(self):38 client = ConnectApiClient()39 resp = client.get_ip_riskrules()40 self.check_list(resp)41 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)42 def test_domain_riskrule(self):43 client = ConnectApiClient()44 resp = client.get_domain_riskrules()45 self.check_list(resp)46 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)47 def test_hash_riskrule(self):48 client = ConnectApiClient()49 resp = client.get_hash_riskrules()50 self.check_list(resp)51 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)52 def test_vuln_risklist(self):53 client = ConnectApiClient()54 resp = client.get_vulnerability_risklist(gzip=False)55 itr = resp.iter_lines()56 header = next(itr)57 self.assertIsInstance(header, basestring)58 entries = list(itr)59 self.assertGreater(len(entries), 10)60 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)61 def test_vuln_risklist_gzip(self):62 """download gzip and write to a byte buffer"""63 client = ConnectApiClient()64 resp = client.get_vulnerability_risklist(gzip=True)65 buf = io.BytesIO()66 for itr in resp.iter_content(chunk_size=1024):67 buf.write(itr)68 buf.seek(0)69 self.assertGreater(len(buf.read()), 1000)70 buf.close()71 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)72 def test_vuln_riskrule(self):73 client = ConnectApiClient()74 resp = client.get_vulnerability_riskrules()75 self.check_list(resp)76 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)77 def test_get_search(self):78 client = ConnectApiClient()79 resp = client.search("ip", **dict(risk_score="(91,100]",80 direction='desc'))81 self.assertIsInstance(resp, ConnectApiResponse)82 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)83 def test_get_ip(self):84 client = ConnectApiClient()85 resp = client.lookup_ip("8.8.8.8")86 self.assertIsInstance(resp, DotAccessDict)87 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)88 def test_get_extension(self):89 client = ConnectApiClient()90 info = client.get_extension_info("vulnerability", "CVE-2014-0160",91 "shodan")92 self.assertIsInstance(info, DotAccessDict)93 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)94 def test_vulnerabilty_extension(self):95 client = ConnectApiClient()96 info = client.get_vulnerability_extension("CVE-2014-0160", "shodan")97 self.assertIsInstance(info, DotAccessDict)98 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)99 def test_ip_demoevents(self):100 client = ConnectApiClient()101 res = client.get_ip_demoevents(limit=1)102 # pylint: disable=anomalous-backslash-in-string103 pattern = '.+ \[127.0.0.1\] \[localhost\]: ' \104 'NetScreen device_id=netscreen2 ' \105 '\[Root\]system-notification-00257\(traffic\): ' \106 'start_time=".+" duration=0 policy_id=320001 ' \107 'service=msrpc Endpoint Mapper\(tcp\) proto=6 src ' \108 'zone=office dst zone=internet action=Permit sent=0 ' \109 'rcvd=16384 dst=.+ src=.+'110 self.assertRegexpMatches(res.text, pattern)111 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)112 def test_domain_demoevents(self):113 client = ConnectApiClient()114 res = client.get_domain_demoevents(limit=1)115 # pylint: disable=anomalous-backslash-in-string116 pattern = '.+\s+\d+ .+ TCP_MISS/.+GET http://\S+/\S+ - DIRECT/.*'117 self.assertRegexpMatches(res.text, pattern)118 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)119 def test_hash_demoevents(self):120 client = ConnectApiClient()121 res = client.get_hash_demoevents(limit=1)122 pattern = '.+ Application hash: [0-9a-f]+, .+'123 self.assertRegexpMatches(res.text, pattern)124 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)125 def test_url_demoevents(self):126 client = ConnectApiClient()127 res = client.get_url_demoevents(limit=1)128 # pylint: disable=anomalous-backslash-in-string129 pattern = '.+\s+\d+ .+ TCP_MISS/.+GET https?://\S+/\S+ - DIRECT/.*'130 self.assertRegexpMatches(res.text, pattern)131 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)132 def test_vuln_demoevents(self):133 client = ConnectApiClient()134 res = client.get_vulnerability_demoevents(limit=1)135 self.assertEquals(res.text[:20], u'{"_scan_result_info"')136 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)137 def test_url_risklist(self):138 client = ConnectApiClient()139 resp = client.get_url_risklist(gzip=False)140 itr = resp.iter_lines()141 header = next(itr)142 self.assertIsInstance(header, basestring)143 entries = list(itr)144 self.assertGreater(len(entries), 10)145 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)146 def test_url_risklist_gzip(self):147 """download gzip and write to a byte buffer"""148 client = ConnectApiClient()149 resp = client.get_url_risklist(gzip=True)150 buf = io.BytesIO()151 for itr in resp.iter_content(chunk_size=1024):152 buf.write(itr)153 buf.seek(0)154 self.assertGreater(len(buf.read()), 1000)155 buf.close()156 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)157 def test_get_url(self):158 client = ConnectApiClient()159 url = 'https://sites.google.com/site/unblockingnotice/'160 resp = client.lookup_url(url)161 self.assertIsInstance(resp, DotAccessDict)162 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)163 def test_get_hash_extension(self):164 client = ConnectApiClient()165 hash_val = '21232f297a57a5a743894a0e4a801fc3'166 info = client.get_hash_extension(hash_val, 'active_reversinglabs')167 self.assertIsInstance(info, DotAccessDict)168 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)169 def test_get_malware(self):170 client = ConnectApiClient()171 resp = client.lookup_malware('KoneQR')172 self.assertIsInstance(resp, DotAccessDict)173 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)174 def test_head_fusion_file(self):175 client = ConnectApiClient()176 info = client.head_fusion_file("/public/default_ip_risklist.csv")177 self.assertIsInstance(info, requests.structures.CaseInsensitiveDict)178 @unittest.skipUnless(IS_DEFAULT_API_URL, IS_DEFAULT_API_URL_MSG)179 def test_get_fusion_file(self):180 client = ConnectApiClient()181 info = client.get_fusion_file("/public/default_ip_risklist.csv")182 self.assertIsInstance(info, requests.models.Response)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful