From dfa0262f19a9060702b3a56ba8f5a53b80465609 Mon Sep 17 00:00:00 2001 From: mrbean-bremen Date: Sat, 3 Jul 2021 20:15:05 +0200 Subject: [PATCH] Add type hints for more classes - includes FakeFileOpen and all file wrapper classes - see #599 --- pyfakefs/fake_filesystem.py | 285 ++++++++++++++++++++++-------------- pyfakefs/helpers.py | 3 + 2 files changed, 177 insertions(+), 111 deletions(-) diff --git a/pyfakefs/fake_filesystem.py b/pyfakefs/fake_filesystem.py index 12908d8b..21338ee0 100644 --- a/pyfakefs/fake_filesystem.py +++ b/pyfakefs/fake_filesystem.py @@ -102,6 +102,7 @@ import traceback import uuid from collections import namedtuple +from doctest import TestResults from enum import Enum from stat import ( S_IFREG, S_IFDIR, S_ISLNK, S_IFMT, S_ISDIR, S_IFLNK, S_ISREG, S_IFSOCK @@ -109,7 +110,7 @@ from types import ModuleType from typing import ( List, Optional, Callable, Union, Any, Dict, Tuple, cast, AnyStr, overload, - NoReturn, ClassVar, IO + NoReturn, ClassVar, IO, Iterator, TextIO ) from pyfakefs.deprecator import Deprecator from pyfakefs.extra_packages import use_scandir @@ -327,6 +328,7 @@ def __init__(self, name: AnyStr, self.parent_dir: Optional[FakeDirectory] = None # Linux specific: extended file system attributes self.xattr: Dict = {} + self.opened_as: AnyString = '' @property def byte_contents(self) -> Optional[bytes]: @@ -3856,6 +3858,7 @@ def open(self, path: AnyStr, flags: int, mode: Optional[int] = None, *, fake_file = FakeFileOpen( self.filesystem, delete_on_close=delete_on_close, raw_io=True)( path, str_flags, open_modes=open_modes) + assert not isinstance(fake_file, StandardStreamWrapper) if fake_file.file_object != self.filesystem.dev_null: self.chmod(path, mode) return fake_file.fileno() @@ -3873,7 +3876,7 @@ def close(self, fd: int) -> None: file_handle = self.filesystem.get_open_file(fd) file_handle.close() - def read(self, fd: int, n: int) -> int: + def read(self, fd: int, n: int) -> bytes: """Read number of bytes from a file descriptor, returns bytes read. Args: @@ -3960,6 +3963,7 @@ def fstat(self, fd: int) -> FakeStatResult: """ # stat should return the tuple representing return value of os.stat file_object = self.filesystem.get_open_file(fd).get_object() + assert isinstance(file_object, FakeFile) return file_object.stat_result.copy() def umask(self, mask: int) -> int: @@ -4438,9 +4442,9 @@ def _path_with_dir_fd(self, path: AnyStr, fct: Callable, raise ValueError("%s: Can't specify dir_fd without " "matching path_str" % fct.__name__) if not self.path.isabs(path): - return self.path.join( - self.filesystem.get_open_file( - dir_fd).get_object().path, path) + open_file = self.filesystem.get_open_file(dir_fd) + return self.path.join( # type: ignore[type-var, return-value] + cast(FakeFile, open_file.get_object()).path, path) return path def truncate(self, path: AnyStr, length: int) -> None: @@ -4473,7 +4477,10 @@ def ftruncate(self, fd: int, length: int) -> None: OSError: if the file descriptor is invalid """ file_object = self.filesystem.get_open_file(fd).get_object() - file_object.size = length + if isinstance(file_object, FakeFileWrapper): + file_object.size = length + else: + raise OSError(errno.EBADF, 'Invalid file descriptor') def access(self, path: AnyStr, mode: int, *, dir_fd: Optional[int] = None, @@ -4811,7 +4818,7 @@ def open(self, file: Union[AnyStr, int], errors: Optional[str] = None, newline: Optional[str] = None, closefd: bool = True, - opener: Optional[Callable] = None) -> Union["FakeFileWrapper", + opener: Optional[Callable] = None) -> Union[AnyFileWrapper, IO[Any]]: """Redirect the call to FakeFileOpen. See FakeFileOpen.call() for description. @@ -4860,12 +4867,15 @@ class FakeFileWrapper: the FakeFile object on close() or flush(). """ - def __init__(self, file_object, file_path, update, read, - append, delete_on_close, filesystem, - newline, binary, closefd, encoding, - errors, buffering, raw_io, is_stream=False): + def __init__(self, file_object: FakeFile, + file_path: AnyStr, + update: bool, read: bool, append: bool, delete_on_close: bool, + filesystem: FakeFilesystem, + newline: Optional[str], binary: bool, closefd: bool, + encoding: Optional[str], errors: Optional[str], + buffering: int, raw_io: bool, is_stream: bool = False): self.file_object = file_object - self.file_path = file_path + self.file_path = file_path # type: ignore[var-annotated] self._append = append self._read = read self.allow_update = update @@ -4886,10 +4896,11 @@ def __init__(self, file_object, file_path, update, read, contents = file_object.byte_contents self._encoding = encoding or locale.getpreferredencoding(False) errors = errors or 'strict' - self._io = (BinaryBufferIO(contents) if binary - else TextBufferIO(contents, encoding=encoding, - newline=newline, errors=errors)) - + self._io: Union[BinaryBufferIO, TextBufferIO] = ( + BinaryBufferIO(contents) if binary + else TextBufferIO(contents, encoding=encoding, + newline=newline, errors=errors) + ) self._read_whence = 0 self._read_seek = 0 self._flush_pos = 0 @@ -4909,31 +4920,33 @@ def __init__(self, file_object, file_path, update, read, # override, don't modify FakeFile.name, as FakeFilesystem expects # it to be the file name only, no directories. self.name = file_object.opened_as - self.filedes = None + self.filedes: Optional[int] = None - def __enter__(self): + def __enter__(self) -> 'FakeFileWrapper': """To support usage of this fake file with the 'with' statement.""" return self - def __exit__(self, type, value, traceback): + def __exit__(self, type: Any, value: Any, traceback: Any) -> None: """To support usage of this fake file with the 'with' statement.""" self.close() - def _raise(self, message): + def _raise(self, message: str) -> NoReturn: if self.raw_io: self._filesystem.raise_os_error(errno.EBADF, self.file_path) raise io.UnsupportedOperation(message) - def get_object(self): + def get_object(self) -> FakeFile: """Return the FakeFile object that is wrapped by the current instance. """ return self.file_object - def fileno(self): + def fileno(self) -> int: """Return the file descriptor of the file object.""" - return self.filedes + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, 'Invalid file descriptor') - def close(self): + def close(self) -> None: """Close the file.""" # ignore closing a closed file if not self._is_open(): @@ -4944,19 +4957,24 @@ def close(self): self.flush() if self._filesystem.is_windows_fs and self._changed: self.file_object.st_mtime = now() + + assert self.filedes is not None if self._closefd: self._filesystem._close_open_file(self.filedes) else: - self._filesystem.open_files[self.filedes].remove(self) + open_files = self._filesystem.open_files[self.filedes] + assert open_files is not None + open_files.remove(self) if self.delete_on_close: - self._filesystem.remove_object(self.get_object().path) + self._filesystem.remove_object( + self.get_object().path) # type: ignore[arg-type] @property - def closed(self): + def closed(self) -> bool: """Simulate the `closed` attribute on file.""" return not self._is_open() - def _try_flush(self, old_pos): + def _try_flush(self, old_pos: int) -> None: """Try to flush and reset the position if it fails.""" flush_pos = self._flush_pos try: @@ -4968,7 +4986,7 @@ def _try_flush(self, old_pos): self._flush_pos = flush_pos raise - def flush(self): + def flush(self) -> None: """Flush file contents to 'disk'.""" self._check_open_file() if self.allow_update and not self.is_stream: @@ -4976,6 +4994,7 @@ def flush(self): if self._append: self._sync_io() old_contents = self.file_object.byte_contents + assert old_contents is not None contents = old_contents + contents[self._flush_pos:] self._set_stream_contents(contents) else: @@ -4993,19 +5012,20 @@ def flush(self): if not self.is_stream: self._flush_related_files() - def update_flush_pos(self): + def update_flush_pos(self) -> None: self._flush_pos = self._io.tell() - def _flush_related_files(self): + def _flush_related_files(self) -> None: for open_files in self._filesystem.open_files[3:]: if open_files is not None: for open_file in open_files: if (open_file is not self and + isinstance(open_file, FakeFileWrapper) and self.file_object == open_file.file_object and not open_file._append): open_file._sync_io() - def seek(self, offset, whence=0): + def seek(self, offset: int, whence: int = 0) -> None: """Move read/write pointer in 'file'.""" self._check_open_file() if not self._append: @@ -5016,7 +5036,7 @@ def seek(self, offset, whence=0): if not self.is_stream: self.flush() - def tell(self): + def tell(self) -> int: """Return the file's current position. Returns: @@ -5036,16 +5056,17 @@ def tell(self): self._io.seek(write_seek) return self._read_seek - def _sync_io(self): + def _sync_io(self) -> None: """Update the stream with changes to the file object contents.""" if self._file_epoch == self.file_object.epoch: return contents = self.file_object.byte_contents + assert contents is not None self._set_stream_contents(contents) self._file_epoch = self.file_object.epoch - def _set_stream_contents(self, contents): + def _set_stream_contents(self, contents: bytes) -> None: whence = self._io.tell() self._io.seek(0) self._io.truncate() @@ -5053,7 +5074,7 @@ def _set_stream_contents(self, contents): if not self._append: self._io.seek(whence) - def _read_wrappers(self, name): + def _read_wrappers(self, name: str) -> Callable: """Wrap a stream attribute in a read wrapper. Returns a read_wrapper which tracks our own read pointer since the @@ -5089,7 +5110,7 @@ def read_wrapper(*args, **kwargs): return read_wrapper - def _other_wrapper(self, name): + def _other_wrapper(self, name: str) -> Callable: """Wrap a stream attribute in an other_wrapper. Args: @@ -5124,7 +5145,7 @@ def other_wrapper(*args, **kwargs): return other_wrapper - def _write_wrapper(self, name): + def _write_wrapper(self, name: str) -> Callable: """Wrap a stream attribute in a write_wrapper. Args: @@ -5174,16 +5195,17 @@ def write_wrapper(*args, **kwargs): return write_wrapper - def _adapt_size_for_related_files(self, size): + def _adapt_size_for_related_files(self, size: int) -> None: for open_files in self._filesystem.open_files[3:]: if open_files is not None: for open_file in open_files: if (open_file is not self and + isinstance(open_file, FakeFileWrapper) and self.file_object == open_file.file_object and - open_file._append): + cast(FakeFileWrapper, open_file)._append): open_file._read_seek += size - def _truncate_wrapper(self): + def _truncate_wrapper(self) -> Callable: """Wrap truncate() to allow flush after truncate. Returns: @@ -5213,11 +5235,11 @@ def truncate_wrapper(*args, **kwargs): return truncate_wrapper - def size(self): + def size(self) -> int: """Return the content size in bytes of the wrapped file.""" return self.file_object.st_size - def __getattr__(self, name): + def __getattr__(self, name: str) -> Any: if self.file_object.is_large_file(): raise FakeLargeFileIoException(self.file_path) @@ -5250,7 +5272,7 @@ def __getattr__(self, name): return getattr(self._io, name) - def _read_error(self): + def _read_error(self) -> Callable: def read_error(*args, **kwargs): """Throw an error unless the argument is zero.""" if args and args[0] == 0: @@ -5260,7 +5282,7 @@ def read_error(*args, **kwargs): return read_error - def _write_error(self): + def _write_error(self) -> Callable: def write_error(*args, **kwargs): """Throw an error.""" if self.raw_io: @@ -5271,16 +5293,19 @@ def write_error(*args, **kwargs): return write_error - def _is_open(self): - return (self.filedes < len(self._filesystem.open_files) and - self._filesystem.open_files[self.filedes] is not None and - self in self._filesystem.open_files[self.filedes]) + def _is_open(self) -> bool: + if (self.filedes is not None and + self.filedes < len(self._filesystem.open_files)): + open_files = self._filesystem.open_files[self.filedes] + if open_files is not None and self in open_files: + return True + return False - def _check_open_file(self): + def _check_open_file(self) -> None: if not self.is_stream and not self._is_open(): raise ValueError('I/O operation on closed file') - def __iter__(self): + def __iter__(self) -> Union[Iterator[str], Iterator[bytes]]: if not self._read: self._raise('File is not open for reading') return self._io.__iter__() @@ -5295,25 +5320,27 @@ class StandardStreamWrapper: """Wrapper for a system standard stream to be used in open files list. """ - def __init__(self, stream_object): + def __init__(self, stream_object: TextIO): self._stream_object = stream_object - self.filedes = None + self.filedes: Optional[int] = None - def get_object(self): + def get_object(self) -> TextIO: return self._stream_object - def fileno(self): + def fileno(self) -> int: """Return the file descriptor of the wrapped standard stream.""" - return self.filedes + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, 'Invalid file descriptor') - def read(self, n: int) -> int: - return self._stream_object.read() + def read(self, n: int) -> bytes: + return cast(bytes, self._stream_object.read()) - def close(self): + def close(self) -> None: """We do not support closing standard streams.""" pass - def is_stream(self): + def is_stream(self) -> bool: return True @@ -5321,23 +5348,27 @@ class FakeDirWrapper: """Wrapper for a FakeDirectory object to be used in open files list. """ - def __init__(self, file_object, file_path, filesystem): + def __init__(self, file_object: FakeDirectory, + file_path: AnyString, filesystem: FakeFilesystem): self.file_object = file_object self.file_path = file_path self._filesystem = filesystem - self.filedes = None + self.filedes: Optional[int] = None - def get_object(self): + def get_object(self) -> FakeDirectory: """Return the FakeFile object that is wrapped by the current instance. """ return self.file_object - def fileno(self): + def fileno(self) -> int: """Return the file descriptor of the file object.""" - return self.filedes + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, 'Invalid file descriptor') - def close(self): + def close(self) -> None: """Close the directory.""" + assert self.filedes is not None self._filesystem._close_open_file(self.filedes) @@ -5346,42 +5377,47 @@ class FakePipeWrapper: used in open files list. """ - def __init__(self, filesystem, fd): + def __init__(self, filesystem: FakeFilesystem, fd: int): self._filesystem = filesystem self.fd = fd # the real file descriptor self.file_object = None - self.filedes = None + self.filedes: Optional[int] = None - def __enter__(self): + def __enter__(self) -> 'FakePipeWrapper': """To support usage of this fake pipe with the 'with' statement.""" return self - def __exit__(self, type, value, traceback): + def __exit__(self, type: Any, value: Any, traceback: Any) -> None: """To support usage of this fake pipe with the 'with' statement.""" self.close() - def get_object(self): + def get_object(self) -> None: return self.file_object - def fileno(self): + def fileno(self) -> int: """Return the fake file descriptor of the pipe object.""" - return self.filedes + if self.filedes is not None: + return self.filedes + raise OSError(errno.EBADF, 'Invalid file descriptor') - def read(self, numBytes): + def read(self, numBytes: int) -> bytes: """Read from the real pipe.""" return os.read(self.fd, numBytes) - def flush(self): + def flush(self) -> None: """Flush the real pipe?""" pass - def write(self, contents): + def write(self, contents: bytes) -> int: """Write to the real pipe.""" return os.write(self.fd, contents) - def close(self): + def close(self) -> None: """Close the pipe descriptor.""" - self._filesystem.open_files[self.filedes].remove(self) + assert self.filedes is not None + open_files = self._filesystem.open_files[self.filedes] + assert open_files is not None + open_files.remove(self) os.close(self.fd) @@ -5397,7 +5433,8 @@ class FakeFileOpen: """ __name__ = 'FakeFileOpen' - def __init__(self, filesystem, delete_on_close=False, raw_io=False): + def __init__(self, filesystem: FakeFilesystem, + delete_on_close: bool = False, raw_io: bool = False): """ Args: filesystem: FakeFilesystem used to provide file system information @@ -5407,13 +5444,19 @@ def __init__(self, filesystem, delete_on_close=False, raw_io=False): self._delete_on_close = delete_on_close self.raw_io = raw_io - def __call__(self, *args, **kwargs): + def __call__(self, *args: Any, **kwargs: Any) -> AnyFileWrapper: """Redirects calls to file() or open() to appropriate method.""" return self.call(*args, **kwargs) - def call(self, file_, mode='r', buffering=-1, encoding=None, - errors=None, newline=None, closefd=True, opener=None, - open_modes=None): + def call(self, file_: Union[AnyStr, int], + mode: str = 'r', + buffering: int = -1, + encoding: Optional[str] = None, + errors: Optional[str] = None, + newline: Optional[str] = None, + closefd: bool = True, + opener: Any = None, + open_modes: Optional[_OpenModes] = None) -> AnyFileWrapper: """Return a file-like object with the contents of the target file object. @@ -5453,15 +5496,20 @@ def call(self, file_, mode='r', buffering=-1, encoding=None, file_) if file_object is None and file_path is None: # file must be a fake pipe wrapper, find it... - if (len(self.filesystem.open_files) <= filedes - or not self.filesystem.open_files[filedes]): + if (filedes is None or + len(self.filesystem.open_files) <= filedes or + not self.filesystem.open_files[filedes]): raise OSError(errno.EBADF, 'invalid pipe file descriptor') - real_fd = self.filesystem.open_files[filedes][0].fd - wrapper = FakePipeWrapper(self.filesystem, real_fd) + wrappers = self.filesystem.open_files[filedes] + assert wrappers is not None + existing_wrapper = wrappers[0] + assert isinstance(existing_wrapper, FakePipeWrapper) + wrapper = FakePipeWrapper(self.filesystem, existing_wrapper.fd) file_des = self.filesystem._add_open_file(wrapper) wrapper.filedes = file_des return wrapper + assert file_path is not None if not filedes: closefd = True @@ -5470,6 +5518,7 @@ def call(self, file_, mode='r', buffering=-1, encoding=None, not self.filesystem.is_windows_fs)): self.filesystem.raise_os_error(errno.EEXIST, file_path) + assert real_path is not None file_object = self._init_file_object(file_object, file_path, open_modes, real_path) @@ -5506,13 +5555,17 @@ def call(self, file_, mode='r', buffering=-1, encoding=None, if filedes is not None: fakefile.filedes = filedes # replace the file wrapper - self.filesystem.open_files[filedes].append(fakefile) + open_files_list = self.filesystem.open_files[filedes] + assert open_files_list is not None + open_files_list.append(fakefile) else: fakefile.filedes = self.filesystem._add_open_file(fakefile) return fakefile - def _init_file_object(self, file_object, file_path, - open_modes, real_path): + def _init_file_object(self, file_object: Optional[FakeFile], + file_path: AnyStr, + open_modes: _OpenModes, + real_path: AnyString) -> FakeFile: if file_object: if (not is_root() and ((open_modes.can_read and @@ -5529,7 +5582,8 @@ def _init_file_object(self, file_object, file_path, if self.filesystem.islink(file_path): link_object = self.filesystem.resolve(file_path, follow_symlinks=False) - target_path = link_object.contents + assert link_object.contents is not None + target_path = cast(AnyStr, link_object.contents) else: target_path = file_path if self.filesystem.ends_with_path_separator(target_path): @@ -5544,33 +5598,41 @@ def _init_file_object(self, file_object, file_path, apply_umask=True) return file_object - def _handle_file_arg(self, file_): + def _handle_file_arg(self, file_: Union[AnyStr, int]) -> Tuple[ + Optional[FakeFile], Optional[AnyStr], + Optional[int], Optional[AnyStr]]: file_object = None if isinstance(file_, int): # opening a file descriptor - filedes = file_ + filedes: int = file_ wrapper = self.filesystem.get_open_file(filedes) if isinstance(wrapper, FakePipeWrapper): return None, None, filedes, None - self._delete_on_close = wrapper.delete_on_close - file_object = self.filesystem.get_open_file(filedes).get_object() - file_path = file_object.name + if isinstance(wrapper, FakeFileWrapper): + self._delete_on_close = wrapper.delete_on_close + file_object = cast(FakeFile, self.filesystem.get_open_file( + filedes).get_object()) + assert file_object is not None + path = file_object.name + return file_object, cast(AnyStr, path), filedes, cast(AnyStr, path) + + # open a file file by path + file_path = cast(AnyStr, file_) + if file_path == self.filesystem.dev_null.name: + file_object = self.filesystem.dev_null real_path = file_path else: - # open a file file by path - filedes = None - file_path = file_ - if file_path == self.filesystem.dev_null.name: - file_object = self.filesystem.dev_null - real_path = file_path - else: - real_path = self.filesystem.resolve_path(file_path) - if self.filesystem.exists(file_path): - file_object = self.filesystem.get_object_from_normpath( - real_path, check_read_perm=False) - return file_object, file_path, filedes, real_path - - def _handle_file_mode(self, mode, newline, open_modes): + real_path = self.filesystem.resolve_path(file_path) + if self.filesystem.exists(file_path): + file_object = self.filesystem.get_object_from_normpath( + real_path, check_read_perm=False) + return file_object, file_path, None, real_path + + def _handle_file_mode( + self, mode: str, + newline: Optional[str], + open_modes: Optional[_OpenModes]) -> Tuple[Optional[str], + _OpenModes]: orig_modes = mode # Save original modes for error messages. # Normalize modes. Handle 't' and 'U'. if 'b' in mode and 't' in mode: @@ -5581,10 +5643,11 @@ def _handle_file_mode(self, mode, newline, open_modes): if mode not in _OPEN_MODE_MAP: raise ValueError('Invalid mode: %r' % orig_modes) open_modes = _OpenModes(*_OPEN_MODE_MAP[mode]) + assert open_modes is not None return newline, open_modes -def _run_doctest(): +def _run_doctest() -> TestResults: import doctest import pyfakefs return doctest.testmod(pyfakefs.fake_filesystem) diff --git a/pyfakefs/helpers.py b/pyfakefs/helpers.py index 047734e6..ba75d2a3 100644 --- a/pyfakefs/helpers.py +++ b/pyfakefs/helpers.py @@ -333,6 +333,9 @@ def st_ctime_ns(self, val: int) -> None: class BinaryBufferIO(io.BytesIO): """Stream class that handles byte contents for files.""" + def __init__(self, contents: Optional[bytes]): + super().__init__(contents or b'') + def putvalue(self, value: bytes) -> None: self.write(value)