"Test posix functions" | |
from test import test_support | |
# Skip these tests if there is no posix module. | |
posix = test_support.import_module('posix') | |
import errno | |
import sys | |
import time | |
import os | |
import pwd | |
import shutil | |
import sys | |
import unittest | |
import warnings | |
warnings.filterwarnings('ignore', '.* potential security risk .*', | |
RuntimeWarning) | |
class PosixTester(unittest.TestCase): | |
def setUp(self): | |
# create empty file | |
fp = open(test_support.TESTFN, 'w+') | |
fp.close() | |
def tearDown(self): | |
os.unlink(test_support.TESTFN) | |
def testNoArgFunctions(self): | |
# test posix functions which take no arguments and have | |
# no side-effects which we need to cleanup (e.g., fork, wait, abort) | |
NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdu", "uname", | |
"times", "getloadavg", "tmpnam", | |
"getegid", "geteuid", "getgid", "getgroups", | |
"getpid", "getpgrp", "getppid", "getuid", | |
] | |
with warnings.catch_warnings(): | |
warnings.filterwarnings("ignore", "", DeprecationWarning) | |
for name in NO_ARG_FUNCTIONS: | |
posix_func = getattr(posix, name, None) | |
if posix_func is not None: | |
posix_func() | |
self.assertRaises(TypeError, posix_func, 1) | |
if hasattr(posix, 'getresuid'): | |
def test_getresuid(self): | |
user_ids = posix.getresuid() | |
self.assertEqual(len(user_ids), 3) | |
for val in user_ids: | |
self.assertGreaterEqual(val, 0) | |
if hasattr(posix, 'getresgid'): | |
def test_getresgid(self): | |
group_ids = posix.getresgid() | |
self.assertEqual(len(group_ids), 3) | |
for val in group_ids: | |
self.assertGreaterEqual(val, 0) | |
if hasattr(posix, 'setresuid'): | |
def test_setresuid(self): | |
current_user_ids = posix.getresuid() | |
self.assertIsNone(posix.setresuid(*current_user_ids)) | |
# -1 means don't change that value. | |
self.assertIsNone(posix.setresuid(-1, -1, -1)) | |
def test_setresuid_exception(self): | |
# Don't do this test if someone is silly enough to run us as root. | |
current_user_ids = posix.getresuid() | |
if 0 not in current_user_ids: | |
new_user_ids = (current_user_ids[0]+1, -1, -1) | |
self.assertRaises(OSError, posix.setresuid, *new_user_ids) | |
if hasattr(posix, 'setresgid'): | |
def test_setresgid(self): | |
current_group_ids = posix.getresgid() | |
self.assertIsNone(posix.setresgid(*current_group_ids)) | |
# -1 means don't change that value. | |
self.assertIsNone(posix.setresgid(-1, -1, -1)) | |
def test_setresgid_exception(self): | |
# Don't do this test if someone is silly enough to run us as root. | |
current_group_ids = posix.getresgid() | |
if 0 not in current_group_ids: | |
new_group_ids = (current_group_ids[0]+1, -1, -1) | |
self.assertRaises(OSError, posix.setresgid, *new_group_ids) | |
@unittest.skipUnless(hasattr(posix, 'initgroups'), | |
"test needs os.initgroups()") | |
def test_initgroups(self): | |
# It takes a string and an integer; check that it raises a TypeError | |
# for other argument lists. | |
self.assertRaises(TypeError, posix.initgroups) | |
self.assertRaises(TypeError, posix.initgroups, None) | |
self.assertRaises(TypeError, posix.initgroups, 3, "foo") | |
self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) | |
# If a non-privileged user invokes it, it should fail with OSError | |
# EPERM. | |
if os.getuid() != 0: | |
name = pwd.getpwuid(posix.getuid()).pw_name | |
try: | |
posix.initgroups(name, 13) | |
except OSError as e: | |
self.assertEqual(e.errno, errno.EPERM) | |
else: | |
self.fail("Expected OSError to be raised by initgroups") | |
def test_statvfs(self): | |
if hasattr(posix, 'statvfs'): | |
self.assertTrue(posix.statvfs(os.curdir)) | |
def test_fstatvfs(self): | |
if hasattr(posix, 'fstatvfs'): | |
fp = open(test_support.TESTFN) | |
try: | |
self.assertTrue(posix.fstatvfs(fp.fileno())) | |
finally: | |
fp.close() | |
def test_ftruncate(self): | |
if hasattr(posix, 'ftruncate'): | |
fp = open(test_support.TESTFN, 'w+') | |
try: | |
# we need to have some data to truncate | |
fp.write('test') | |
fp.flush() | |
posix.ftruncate(fp.fileno(), 0) | |
finally: | |
fp.close() | |
def test_dup(self): | |
if hasattr(posix, 'dup'): | |
fp = open(test_support.TESTFN) | |
try: | |
fd = posix.dup(fp.fileno()) | |
self.assertIsInstance(fd, int) | |
os.close(fd) | |
finally: | |
fp.close() | |
def test_confstr(self): | |
if hasattr(posix, 'confstr'): | |
self.assertRaises(ValueError, posix.confstr, "CS_garbage") | |
self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True) | |
def test_dup2(self): | |
if hasattr(posix, 'dup2'): | |
fp1 = open(test_support.TESTFN) | |
fp2 = open(test_support.TESTFN) | |
try: | |
posix.dup2(fp1.fileno(), fp2.fileno()) | |
finally: | |
fp1.close() | |
fp2.close() | |
def fdopen_helper(self, *args): | |
fd = os.open(test_support.TESTFN, os.O_RDONLY) | |
fp2 = posix.fdopen(fd, *args) | |
fp2.close() | |
def test_fdopen(self): | |
if hasattr(posix, 'fdopen'): | |
self.fdopen_helper() | |
self.fdopen_helper('r') | |
self.fdopen_helper('r', 100) | |
def test_osexlock(self): | |
if hasattr(posix, "O_EXLOCK"): | |
fd = os.open(test_support.TESTFN, | |
os.O_WRONLY|os.O_EXLOCK|os.O_CREAT) | |
self.assertRaises(OSError, os.open, test_support.TESTFN, | |
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) | |
os.close(fd) | |
if hasattr(posix, "O_SHLOCK"): | |
fd = os.open(test_support.TESTFN, | |
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) | |
self.assertRaises(OSError, os.open, test_support.TESTFN, | |
os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK) | |
os.close(fd) | |
def test_osshlock(self): | |
if hasattr(posix, "O_SHLOCK"): | |
fd1 = os.open(test_support.TESTFN, | |
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) | |
fd2 = os.open(test_support.TESTFN, | |
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) | |
os.close(fd2) | |
os.close(fd1) | |
if hasattr(posix, "O_EXLOCK"): | |
fd = os.open(test_support.TESTFN, | |
os.O_WRONLY|os.O_SHLOCK|os.O_CREAT) | |
self.assertRaises(OSError, os.open, test_support.TESTFN, | |
os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK) | |
os.close(fd) | |
def test_fstat(self): | |
if hasattr(posix, 'fstat'): | |
fp = open(test_support.TESTFN) | |
try: | |
self.assertTrue(posix.fstat(fp.fileno())) | |
finally: | |
fp.close() | |
def test_stat(self): | |
if hasattr(posix, 'stat'): | |
self.assertTrue(posix.stat(test_support.TESTFN)) | |
def _test_all_chown_common(self, chown_func, first_param): | |
"""Common code for chown, fchown and lchown tests.""" | |
if os.getuid() == 0: | |
try: | |
# Many linux distros have a nfsnobody user as MAX_UID-2 | |
# that makes a good test case for signedness issues. | |
# http://bugs.python.org/issue1747858 | |
# This part of the test only runs when run as root. | |
# Only scary people run their tests as root. | |
ent = pwd.getpwnam('nfsnobody') | |
chown_func(first_param, ent.pw_uid, ent.pw_gid) | |
except KeyError: | |
pass | |
else: | |
# non-root cannot chown to root, raises OSError | |
self.assertRaises(OSError, chown_func, | |
first_param, 0, 0) | |
# test a successful chown call | |
chown_func(first_param, os.getuid(), os.getgid()) | |
@unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()") | |
def test_chown(self): | |
# raise an OSError if the file does not exist | |
os.unlink(test_support.TESTFN) | |
self.assertRaises(OSError, posix.chown, test_support.TESTFN, -1, -1) | |
# re-create the file | |
open(test_support.TESTFN, 'w').close() | |
self._test_all_chown_common(posix.chown, test_support.TESTFN) | |
@unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()") | |
def test_fchown(self): | |
os.unlink(test_support.TESTFN) | |
# re-create the file | |
test_file = open(test_support.TESTFN, 'w') | |
try: | |
fd = test_file.fileno() | |
self._test_all_chown_common(posix.fchown, fd) | |
finally: | |
test_file.close() | |
@unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()") | |
def test_lchown(self): | |
os.unlink(test_support.TESTFN) | |
# create a symlink | |
os.symlink('/tmp/dummy-symlink-target', test_support.TESTFN) | |
self._test_all_chown_common(posix.lchown, test_support.TESTFN) | |
def test_chdir(self): | |
if hasattr(posix, 'chdir'): | |
posix.chdir(os.curdir) | |
self.assertRaises(OSError, posix.chdir, test_support.TESTFN) | |
def test_lsdir(self): | |
if hasattr(posix, 'lsdir'): | |
self.assertIn(test_support.TESTFN, posix.lsdir(os.curdir)) | |
def test_access(self): | |
if hasattr(posix, 'access'): | |
self.assertTrue(posix.access(test_support.TESTFN, os.R_OK)) | |
def test_umask(self): | |
if hasattr(posix, 'umask'): | |
old_mask = posix.umask(0) | |
self.assertIsInstance(old_mask, int) | |
posix.umask(old_mask) | |
def test_strerror(self): | |
if hasattr(posix, 'strerror'): | |
self.assertTrue(posix.strerror(0)) | |
def test_pipe(self): | |
if hasattr(posix, 'pipe'): | |
reader, writer = posix.pipe() | |
os.close(reader) | |
os.close(writer) | |
def test_tempnam(self): | |
if hasattr(posix, 'tempnam'): | |
with warnings.catch_warnings(): | |
warnings.filterwarnings("ignore", "tempnam", DeprecationWarning) | |
self.assertTrue(posix.tempnam()) | |
self.assertTrue(posix.tempnam(os.curdir)) | |
self.assertTrue(posix.tempnam(os.curdir, 'blah')) | |
def test_tmpfile(self): | |
if hasattr(posix, 'tmpfile'): | |
with warnings.catch_warnings(): | |
warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning) | |
fp = posix.tmpfile() | |
fp.close() | |
def test_utime(self): | |
if hasattr(posix, 'utime'): | |
now = time.time() | |
posix.utime(test_support.TESTFN, None) | |
self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, None)) | |
self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (now, None)) | |
self.assertRaises(TypeError, posix.utime, test_support.TESTFN, (None, now)) | |
posix.utime(test_support.TESTFN, (int(now), int(now))) | |
posix.utime(test_support.TESTFN, (now, now)) | |
def test_chflags(self): | |
if hasattr(posix, 'chflags'): | |
st = os.stat(test_support.TESTFN) | |
if hasattr(st, 'st_flags'): | |
posix.chflags(test_support.TESTFN, st.st_flags) | |
def test_lchflags(self): | |
if hasattr(posix, 'lchflags'): | |
st = os.stat(test_support.TESTFN) | |
if hasattr(st, 'st_flags'): | |
posix.lchflags(test_support.TESTFN, st.st_flags) | |
def test_getcwd_long_pathnames(self): | |
if hasattr(posix, 'getcwd'): | |
dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef' | |
curdir = os.getcwd() | |
base_path = os.path.abspath(test_support.TESTFN) + '.getcwd' | |
try: | |
os.mkdir(base_path) | |
os.chdir(base_path) | |
except: | |
# Just returning nothing instead of the SkipTest exception, | |
# because the test results in Error in that case. | |
# Is that ok? | |
# raise unittest.SkipTest, "cannot create directory for testing" | |
return | |
try: | |
def _create_and_do_getcwd(dirname, current_path_length = 0): | |
try: | |
os.mkdir(dirname) | |
except: | |
raise unittest.SkipTest, "mkdir cannot create directory sufficiently deep for getcwd test" | |
os.chdir(dirname) | |
try: | |
os.getcwd() | |
if current_path_length < 4099: | |
_create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1) | |
except OSError as e: | |
expected_errno = errno.ENAMETOOLONG | |
if 'sunos' in sys.platform or 'openbsd' in sys.platform: | |
expected_errno = errno.ERANGE # Issue 9185 | |
self.assertEqual(e.errno, expected_errno) | |
finally: | |
os.chdir('..') | |
os.rmdir(dirname) | |
_create_and_do_getcwd(dirname) | |
finally: | |
os.chdir(curdir) | |
shutil.rmtree(base_path) | |
@unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()") | |
def test_getgroups(self): | |
with os.popen('id -G') as idg: | |
groups = idg.read().strip() | |
if not groups: | |
raise unittest.SkipTest("need working 'id -G'") | |
# 'id -G' and 'os.getgroups()' should return the same | |
# groups, ignoring order and duplicates. | |
# #10822 - it is implementation defined whether posix.getgroups() | |
# includes the effective gid so we include it anyway, since id -G does | |
self.assertEqual( | |
set([int(x) for x in groups.split()]), | |
set(posix.getgroups() + [posix.getegid()])) | |
class PosixGroupsTester(unittest.TestCase): | |
def setUp(self): | |
if posix.getuid() != 0: | |
raise unittest.SkipTest("not enough privileges") | |
if not hasattr(posix, 'getgroups'): | |
raise unittest.SkipTest("need posix.getgroups") | |
if sys.platform == 'darwin': | |
raise unittest.SkipTest("getgroups(2) is broken on OSX") | |
self.saved_groups = posix.getgroups() | |
def tearDown(self): | |
if hasattr(posix, 'setgroups'): | |
posix.setgroups(self.saved_groups) | |
elif hasattr(posix, 'initgroups'): | |
name = pwd.getpwuid(posix.getuid()).pw_name | |
posix.initgroups(name, self.saved_groups[0]) | |
@unittest.skipUnless(hasattr(posix, 'initgroups'), | |
"test needs posix.initgroups()") | |
def test_initgroups(self): | |
# find missing group | |
g = max(self.saved_groups) + 1 | |
name = pwd.getpwuid(posix.getuid()).pw_name | |
posix.initgroups(name, g) | |
self.assertIn(g, posix.getgroups()) | |
@unittest.skipUnless(hasattr(posix, 'setgroups'), | |
"test needs posix.setgroups()") | |
def test_setgroups(self): | |
for groups in [[0], range(16)]: | |
posix.setgroups(groups) | |
self.assertListEqual(groups, posix.getgroups()) | |
def test_main(): | |
test_support.run_unittest(PosixTester, PosixGroupsTester) | |
if __name__ == '__main__': | |
test_main() |