Skip to content

Commit

Permalink
Fixed import of packages from requirement files with UTF-16 encoding (#…
Browse files Browse the repository at this point in the history
…1387)

* Fixed import of packages from requirement files with UTF-16 encoding

* Update CHANGES.md

* Load requirement files using the pip auto_decode function

---------

Co-authored-by: Cooper Lees <[email protected]>
  • Loading branch information
francescocaponio and cooperlees authored Feb 28, 2023
1 parent 08b28f7 commit e1e0365
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ max_line_length = 88
# E722 is a duplicate of B001.
# P207 is a duplicate of B003.
# W503 is against PEP8
ignore = E722, P207, W503
ignore = E722, P207, W503, E203
max-complexity = 20
exclude =
build,
Expand Down
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- Move Docker to build in 3.11 `PR #1341`
- Add "--force-check" parameter to runner.py `PR #1347`

## Bug Fixes

- Fixed import of packages from requirement files with UTF-16 encoding `PR #1387`

# 6.1.0

## New Features
Expand Down
34 changes: 34 additions & 0 deletions src/bandersnatch/tests/plugins/test_allowlist_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,37 @@ def test__filter__find__glob__files(self) -> None:
# Check that the package in the last file, excluded
# from the glob is not considered
self.assertNotIn("baz", mirror.packages_to_sync)

def test__filter__requirements__utf16__encoding(self) -> None:
absolute_file_path = Path(self.tempdir.name) / "requirements.txt"
with open(absolute_file_path, "w", encoding="UTF-16") as fh:
fh.write(
"""\
foo==1.2.0 # via -r requirements.in
"""
)

mock_config(
f"""\
[mirror]
storage-backend = filesystem
workers = 2
[plugins]
enabled =
project_requirements
[allowlist]
requirements =
{absolute_file_path}
"""
)

mirror = BandersnatchMirror(Path("."), Master(url="https://foo.bar.com"))

mirror.packages_to_sync = {
"foo": "",
"bar": "",
"baz": "",
}
mirror._filter_packages()
self.assertEqual({"foo": ""}, mirror.packages_to_sync)
7 changes: 5 additions & 2 deletions src/bandersnatch_filter_plugins/allowlist_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from bandersnatch.filter import FilterProjectPlugin, FilterReleasePlugin

from .encoding import auto_decode

logger = logging.getLogger("bandersnatch")


Expand Down Expand Up @@ -153,8 +155,9 @@ def _determine_unfiltered_package_names(self) -> List[str]:
return []

for filepath in filepaths:
with open(filepath) as req_fh:
filtered_requirements |= _parse_package_lines(req_fh.readlines())
with open(filepath, "rb") as req_fh:
content = auto_decode(req_fh.read())
filtered_requirements |= _parse_package_lines(content.splitlines())
return list(req.name for req in filtered_requirements)


Expand Down
36 changes: 36 additions & 0 deletions src/bandersnatch_filter_plugins/encoding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import codecs
import locale
import re
import sys
from typing import List, Tuple

BOMS: List[Tuple[bytes, str]] = [
(codecs.BOM_UTF8, "utf-8"),
(codecs.BOM_UTF16, "utf-16"),
(codecs.BOM_UTF16_BE, "utf-16-be"),
(codecs.BOM_UTF16_LE, "utf-16-le"),
(codecs.BOM_UTF32, "utf-32"),
(codecs.BOM_UTF32_BE, "utf-32-be"),
(codecs.BOM_UTF32_LE, "utf-32-le"),
]

ENCODING_RE = re.compile(rb"coding[:=]\s*([-\w.]+)")


def auto_decode(data: bytes) -> str:
"""Check a bytes string for a BOM to correctly detect the encoding
Fallback to locale.getpreferredencoding(False) like open() on Python3"""
for bom, encoding in BOMS:
if data.startswith(bom):
return data[len(bom) :].decode(encoding)
# Lets check the first two lines as in PEP263
for line in data.split(b"\n")[:2]:
if line[0:1] == b"#" and ENCODING_RE.search(line):
result = ENCODING_RE.search(line)
assert result is not None
encoding = result.groups()[0].decode("ascii")
return data.decode(encoding)
return data.decode(
locale.getpreferredencoding(False) or sys.getdefaultencoding(),
)

0 comments on commit e1e0365

Please sign in to comment.