From 6c685a490cc86f3c8c74c329078852b14889827c Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sat, 9 Dec 2017 16:49:15 +0100 Subject: [PATCH] Added support for patching scandir module - module contains backported os.scandir --- fake_filesystem_test.py | 24 +-- pyfakefs/fake_filesystem.py | 186 +---------------------- pyfakefs/fake_filesystem_unittest.py | 11 +- pyfakefs/fake_pathlib.py | 3 +- pyfakefs/fake_scandir.py | 218 +++++++++++++++++++++++++++ 5 files changed, 246 insertions(+), 196 deletions(-) create mode 100644 pyfakefs/fake_scandir.py diff --git a/fake_filesystem_test.py b/fake_filesystem_test.py index 8514abef..18121f3c 100755 --- a/fake_filesystem_test.py +++ b/fake_filesystem_test.py @@ -30,7 +30,7 @@ from pyfakefs import fake_filesystem from pyfakefs.fake_filesystem import FakeFileOpen - +from pyfakefs.fake_filesystem_unittest import has_scandir class _DummyTime(object): """Mock replacement for time.time. Increases returned time on access.""" @@ -2676,14 +2676,6 @@ def testClassifyDirectoryContents(self): self.assertEqual(test_directories, dirs) self.assertEqual(test_files, files) - def testClassifyDoesNotHideExceptions(self): - """_ClassifyDirectoryContents should not hide exceptions.""" - self.skipRealFs() - directory = self.makePath('foo') - self.assertEqual(False, self.os.path.exists(directory)) - self.assertRaisesOSError(errno.ENOENT, - self.os._ClassifyDirectoryContents, directory) - # os.mknod does not work under MacOS due to permission issues # so we test it under Linux only @@ -4826,12 +4818,21 @@ def testOpen(self): self.assertLess(0, fd) -@unittest.skipIf(sys.version_info < (3, 5), +@unittest.skipIf(sys.version_info < (3, 5) and not has_scandir, 'os.scandir was introduced in Python 3.5') class FakeScandirTest(FakeOsModuleTestBase): def setUp(self): super(FakeScandirTest, self).setUp() self.skipIfSymlinkNotSupported() + + if has_scandir: + if self.useRealFs(): + from scandir import scandir + else: + from fake_scandir import scandir + else: + scandir = self.os.scandir + directory = self.makePath('xyzzy', 'plugh') link_dir = self.makePath('linked', 'plugh') self.linked_file_path = self.os.path.join(link_dir, 'file') @@ -4847,8 +4848,7 @@ def setUp(self): self.createLink(self.file_link_path, self.linked_file_path) self.dir_link_path = self.os.path.join(directory, 'link_dir') self.createLink(self.dir_link_path, self.linked_dir_path) - - self.dir_entries = [entry for entry in self.os.scandir(directory)] + self.dir_entries = [entry for entry in scandir(directory)] self.dir_entries = sorted(self.dir_entries, key=lambda entry: entry.name) diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index d8f6d411..ffae28f8 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -105,6 +105,8 @@ import stat from copy import copy +from pyfakefs.fake_scandir import scandir, walk + __pychecker__ = 'no-reimportself' __version__ = '3.4' @@ -2738,136 +2740,6 @@ def ListDir(self, target_directory): directory_contents = directory.contents return list(directory_contents.keys()) - if sys.version_info >= (3, 5): - class DirEntry(): - """Emulates os.DirEntry. Note that we did not enforce keyword only arguments.""" - - def __init__(self, filesystem): - """Initialize the dir entry with unset values. - - Args: - filesystem: the fake filesystem used for implementation. - """ - self._filesystem = filesystem - self.name = '' - self.path = '' - self._inode = None - self._islink = False - self._isdir = False - self._statresult = None - self._statresult_symlink = None - - def inode(self): - """Return the inode number of the entry.""" - if self._inode is None: - self.stat(follow_symlinks=False) - return self._inode - - def is_dir(self, follow_symlinks=True): - """Return True if this entry is a directory entry. - - Args: - follow_symlinks: If True, also return True if this entry is a symlink - pointing to a directory. - - Returns: - True if this entry is an existing directory entry, or if - follow_symlinks is set, and this entry points to an existing directory entry. - """ - return self._isdir and (follow_symlinks or not self._islink) - - def is_file(self, follow_symlinks=True): - """Return True if this entry is a regular file entry. - - Args: - follow_symlinks: If True, also return True if this entry is a symlink - pointing to a regular file. - - Returns: - True if this entry is an existing file entry, or if - follow_symlinks is set, and this entry points to an existing file entry. - """ - return not self._isdir and (follow_symlinks or not self._islink) - - def is_symlink(self): - """Return True if this entry is a symbolic link (even if broken).""" - return self._islink - - def stat(self, follow_symlinks=True): - """Return a stat_result object for this entry. - - Args: - follow_symlinks: If False and the entry is a symlink, return the - result for the symlink, otherwise for the object it points to. - """ - if follow_symlinks: - if self._statresult_symlink is None: - file_object = self._filesystem.ResolveObject(self.path) - if self._filesystem.is_windows_fs: - file_object.st_nlink = 0 - self._statresult_symlink = file_object.stat_result.copy() - return self._statresult_symlink - - if self._statresult is None: - file_object = self._filesystem.LResolveObject(self.path) - self._inode = file_object.st_ino - if self._filesystem.is_windows_fs: - file_object.st_nlink = 0 - self._statresult = file_object.stat_result.copy() - return self._statresult - - class ScanDirIter: - """Iterator for DirEntry objects returned from `scandir()` - function.""" - - def __init__(self, filesystem, path): - self.filesystem = filesystem - self.path = self.filesystem.ResolvePath(path) - contents = {} - try: - contents = self.filesystem.ConfirmDir(path).contents - except OSError: - pass - self.contents_iter = iter(contents) - - def __iter__(self): - return self - - def __next__(self): - entry = self.contents_iter.__next__() - dir_entry = self.filesystem.DirEntry(self.filesystem) - dir_entry.name = entry - dir_entry.path = self.filesystem.JoinPaths(self.path, dir_entry.name) - dir_entry._isdir = self.filesystem.IsDir(dir_entry.path) - dir_entry._islink = self.filesystem.IsLink(dir_entry.path) - return dir_entry - - if sys.version_info >= (3, 6): - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.close() - - def close(self): - pass - - def ScanDir(self, path=''): - """Return an iterator of DirEntry objects corresponding to the entries - in the directory given by path. - - Args: - path: Path to the target directory within the fake filesystem. - - Returns: - an iterator to an unsorted list of os.DirEntry objects for - each entry in path. - - Raises: - OSError: if the target is not a directory. - """ - return self.ScanDirIter(self, path) - def __str__(self): return str(self.root) @@ -3586,32 +3458,7 @@ def scandir(self, path=''): Raises: OSError: if the target is not a directory. """ - return self.filesystem.ScanDir(path) - - def _ClassifyDirectoryContents(self, root): - """Classify contents of a directory as files/directories. - - Args: - root: (str) Directory to examine. - - Returns: - (tuple) A tuple consisting of three values: the directory examined, - a list containing all of the directory entries, and a list - containing all of the non-directory entries. - (This is the same format as returned by the `os.walk` generator.) - - Raises: - Nothing on its own, but be ready to catch exceptions generated by - underlying mechanisms like `os.listdir`. - """ - dirs = [] - files = [] - for entry in self.listdir(root): - if self.path.isdir(self.path.join(root, entry)): - dirs.append(entry) - else: - files.append(entry) - return (root, dirs, files) + return scandir(self.filesystem, path) def walk(self, top, topdown=True, onerror=None, followlinks=False): """Perform an os.walk operation over the fake filesystem. @@ -3630,32 +3477,7 @@ def walk(self, top, topdown=True, onerror=None, followlinks=False): subdirectories. See the documentation for the builtin os module for further details. """ - def do_walk(top, topMost=False): - top = self.path.normpath(top) - if not topMost and not followlinks and self.path.islink(top): - return - try: - top_contents = self._ClassifyDirectoryContents(top) - except OSError as exc: - top_contents = None - if onerror is not None: - onerror(exc) - - if top_contents is not None: - if topdown: - yield top_contents - - for directory in top_contents[1]: - if not followlinks and self.path.islink(directory): - continue - for contents in do_walk(self.path.join(top, directory)): - yield contents - - if not topdown: - yield top_contents - - return do_walk(top, topMost=True) - + return walk(self.filesystem, top, topdown, onerror, followlinks) def readlink(self, path, dir_fd=None): """Read the target of a symlink. diff --git a/pyfakefs/fake_filesystem_unittest.py b/pyfakefs/fake_filesystem_unittest.py index 3ceb9b8d..f6986ab6 100644 --- a/pyfakefs/fake_filesystem_unittest.py +++ b/pyfakefs/fake_filesystem_unittest.py @@ -68,6 +68,13 @@ import unittest +try: + import scandir + import fake_scandir + has_scandir = True +except ImportError: + has_scandir = False + if sys.version_info < (3,): import __builtin__ as builtins # pylint: disable=import-error else: @@ -289,6 +296,8 @@ def __init__(self, additional_skip_names=None, patch_path=True, } if self.HAS_PATHLIB: self._fake_module_classes['pathlib'] = fake_pathlib.FakePathlibModule + if has_scandir: + self._fake_module_classes['scandir'] = fake_scandir.FakeScanDirModule self._modules = {} for name in self._fake_module_classes: @@ -364,7 +373,7 @@ def _patch_tempfile(self): """ if 'unlink' in tempfile._TemporaryFileWrapper.__dict__: # Python 2.7 to 3.2: unlink is a class method of _TemporaryFileWrapper - tempfile._TemporaryFileWrapper.unlink = self.fake_os.unlink + tempfile._TemporaryFileWrapper.unlink = self._fake_modules['os'].unlink # Python 3.0 to 3.2 (and PyPy3 based on Python 3.2): # `TemporaryDirectory._rmtree` is used instead of `shutil.rmtree` diff --git a/pyfakefs/fake_pathlib.py b/pyfakefs/fake_pathlib.py index 7b0d1cc3..4dc0ac9b 100644 --- a/pyfakefs/fake_pathlib.py +++ b/pyfakefs/fake_pathlib.py @@ -39,6 +39,7 @@ import errno +from pyfakefs import fake_scandir from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem @@ -86,7 +87,7 @@ class _FakeAccessor(pathlib._Accessor): # pylint: disable=protected-access chmod = _wrap_strfunc(FakeFilesystem.ChangeMode) if sys.version_info >= (3, 6): - scandir = _wrap_strfunc(FakeFilesystem.ScanDir) + scandir = _wrap_strfunc(fake_scandir.scandir) if hasattr(os, "lchmod"): lchmod = _wrap_strfunc(lambda fs, path, mode: FakeFilesystem.ChangeMode( diff --git a/pyfakefs/fake_scandir.py b/pyfakefs/fake_scandir.py new file mode 100644 index 00000000..ba1de7ce --- /dev/null +++ b/pyfakefs/fake_scandir.py @@ -0,0 +1,218 @@ +import sys + + +class DirEntry(object): + """Emulates os.DirEntry. Note that we did not enforce keyword only arguments.""" + + def __init__(self, filesystem): + """Initialize the dir entry with unset values. + + Args: + filesystem: the fake filesystem used for implementation. + """ + self._filesystem = filesystem + self.name = '' + self.path = '' + self._inode = None + self._islink = False + self._isdir = False + self._statresult = None + self._statresult_symlink = None + + def inode(self): + """Return the inode number of the entry.""" + if self._inode is None: + self.stat(follow_symlinks=False) + return self._inode + + def is_dir(self, follow_symlinks=True): + """Return True if this entry is a directory entry. + + Args: + follow_symlinks: If True, also return True if this entry is a symlink + pointing to a directory. + + Returns: + True if this entry is an existing directory entry, or if + follow_symlinks is set, and this entry points to an existing directory entry. + """ + return self._isdir and (follow_symlinks or not self._islink) + + def is_file(self, follow_symlinks=True): + """Return True if this entry is a regular file entry. + + Args: + follow_symlinks: If True, also return True if this entry is a symlink + pointing to a regular file. + + Returns: + True if this entry is an existing file entry, or if + follow_symlinks is set, and this entry points to an existing file entry. + """ + return not self._isdir and (follow_symlinks or not self._islink) + + def is_symlink(self): + """Return True if this entry is a symbolic link (even if broken).""" + return self._islink + + def stat(self, follow_symlinks=True): + """Return a stat_result object for this entry. + + Args: + follow_symlinks: If False and the entry is a symlink, return the + result for the symlink, otherwise for the object it points to. + """ + if follow_symlinks: + if self._statresult_symlink is None: + file_object = self._filesystem.ResolveObject(self.path) + if self._filesystem.is_windows_fs: + file_object.st_nlink = 0 + self._statresult_symlink = file_object.stat_result.copy() + return self._statresult_symlink + + if self._statresult is None: + file_object = self._filesystem.LResolveObject(self.path) + self._inode = file_object.st_ino + if self._filesystem.is_windows_fs: + file_object.st_nlink = 0 + self._statresult = file_object.stat_result.copy() + return self._statresult + + +class ScanDirIter: + """Iterator for DirEntry objects returned from `scandir()` + function.""" + + def __init__(self, filesystem, path): + self.filesystem = filesystem + self.path = self.filesystem.ResolvePath(path) + contents = {} + try: + contents = self.filesystem.ConfirmDir(path).contents + except OSError: + pass + self.contents_iter = iter(contents) + + def __iter__(self): + return self + + def __next__(self): + entry = self.contents_iter.__next__() + dir_entry = DirEntry(self.filesystem) + dir_entry.name = entry + dir_entry.path = self.filesystem.JoinPaths(self.path, dir_entry.name) + dir_entry._isdir = self.filesystem.IsDir(dir_entry.path) + dir_entry._islink = self.filesystem.IsLink(dir_entry.path) + return dir_entry + + if sys.version_info >= (3, 6): + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + pass + + +def scandir(filesystem, path=''): + """Return an iterator of DirEntry objects corresponding to the entries + in the directory given by path. + + Args: + filesystem: The fake filesystem used for implementation + path: Path to the target directory within the fake filesystem. + + Returns: + an iterator to an unsorted list of os.DirEntry objects for + each entry in path. + + Raises: + OSError: if the target is not a directory. + """ + return ScanDirIter(filesystem, path) + + +def _classify_directory_contents(filesystem, root): + """Classify contents of a directory as files/directories. + + Args: + filesystem: The fake filesystem used for implementation + root: (str) Directory to examine. + + Returns: + (tuple) A tuple consisting of three values: the directory examined, + a list containing all of the directory entries, and a list + containing all of the non-directory entries. + (This is the same format as returned by the `os.walk` generator.) + + Raises: + Nothing on its own, but be ready to catch exceptions generated by + underlying mechanisms like `os.listdir`. + """ + dirs = [] + files = [] + for entry in filesystem.ListDir(root): + if filesystem.IsDir(filesystem.JoinPaths(root, entry)): + dirs.append(entry) + else: + files.append(entry) + return root, dirs, files + + +def walk(filesystem, top, topdown=True, onerror=None, followlinks=False): + """Perform an os.walk operation over the fake filesystem. + + Args: + filesystem: The fake filesystem used for implementation + top: The root directory from which to begin walk. + topdown: Determines whether to return the tuples with the root as + the first entry (`True`) or as the last, after all the child + directory tuples (`False`). + onerror: If not `None`, function which will be called to handle the + `os.error` instance provided when `os.listdir()` fails. + followlinks: If `True`, symbolic links are followed. + + Yields: + (path, directories, nondirectories) for top and each of its + subdirectories. See the documentation for the builtin os module + for further details. + """ + + def do_walk(top_dir, top_most=False): + top_dir = filesystem.CollapsePath(top_dir) + if not top_most and not followlinks and filesystem.IsLink(top_dir): + return + try: + top_contents = _classify_directory_contents(filesystem, top_dir) + except OSError as exc: + top_contents = None + if onerror is not None: + onerror(exc) + + if top_contents is not None: + if topdown: + yield top_contents + + for directory in top_contents[1]: + if not followlinks and filesystem.IsLink(directory): + continue + for contents in do_walk(filesystem.JoinPaths(top_dir, directory)): + yield contents + + if not topdown: + yield top_contents + + return do_walk(top, top_most=True) + + +class FakeScanDirModule(object): + def __init__(self, filesystem): + self.filesystem = filesystem + + def scandir(self, path='.'): + return scandir(self.filesystem, path) + + def walk(self, top, topdown=True, onerror=None, followlinks=False): + return walk(self.filesystem, top, topdown, onerror, followlinks)