diff --git a/src/packaging/metadata.py b/src/packaging/metadata.py index 1316e78b..09912ee5 100644 --- a/src/packaging/metadata.py +++ b/src/packaging/metadata.py @@ -281,6 +281,43 @@ def _get_payload(msg: email.message.Message, source: bytes | str) -> str: _RAW_TO_EMAIL_MAPPING = {raw: email for email, raw in _EMAIL_TO_RAW_MAPPING.items()} +# This class is for writing RFC822 messages +class RFC822Policy(email.policy.EmailPolicy): + """ + This is :class:`email.policy.EmailPolicy`, but with a simple ``header_store_parse`` + implementation that handles multiline values, and some nice defaults. + """ + + utf8 = True + mangle_from_ = False + max_line_length = 0 + + def header_store_parse(self, name: str, value: str) -> tuple[str, str]: + size = len(name) + 2 + value = value.replace("\n", "\n" + " " * size) + return (name, value) + + +# This class is for writing RFC822 messages +class RFC822Message(email.message.EmailMessage): + """ + This is :class:`email.message.EmailMessage` with two small changes: it defaults to + our `RFC822Policy`, and it correctly writes unicode when being called + with `bytes()`. + """ + + def __init__(self) -> None: + super().__init__(policy=RFC822Policy()) + + def as_bytes( + self, unixfrom: bool = False, policy: email.policy.Policy | None = None + ) -> bytes: + """ + This handles unicode encoding. + """ + return self.as_string(unixfrom, policy=policy).encode("utf-8") + + def parse_email(data: bytes | str) -> tuple[RawMetadata, dict[str, list[str]]]: """Parse a distribution's metadata stored as email headers (e.g. from ``METADATA``). @@ -859,3 +896,35 @@ def from_email(cls, data: bytes | str, *, validate: bool = True) -> Metadata: """``Provides`` (deprecated)""" obsoletes: _Validator[list[str] | None] = _Validator(added="1.1") """``Obsoletes`` (deprecated)""" + + def as_rfc822(self) -> RFC822Message: + """ + Return an RFC822 message with the metadata. + """ + message = RFC822Message() + self._write_metadata(message) + return message + + def _write_metadata(self, message: RFC822Message) -> None: + """ + Return an RFC822 message with the metadata. + """ + for name, validator in self.__class__.__dict__.items(): + if isinstance(validator, _Validator) and name != "description": + value = getattr(self, name) + email_name = _RAW_TO_EMAIL_MAPPING[name] + if value is not None: + if email_name == "project-url": + for label, url in value.items(): + message[email_name] = f"{label}, {url}" + elif email_name == "keywords": + message[email_name] = ",".join(value) + elif isinstance(value, list): + for item in value: + message[email_name] = str(item) + else: + message[email_name] = str(value) + + # The description is a special case because it is in the body of the message. + if self.description is not None: + message.set_payload(self.description) diff --git a/tests/test_metadata.py b/tests/test_metadata.py index 931ca206..27f546ca 100644 --- a/tests/test_metadata.py +++ b/tests/test_metadata.py @@ -1,4 +1,9 @@ +from __future__ import annotations + +import email.message +import inspect import pathlib +import textwrap import pytest @@ -763,3 +768,319 @@ def test_invalid_license_files(self, license_files): with pytest.raises(metadata.InvalidMetadata): meta.license_files # noqa: B018 + + +class TestMetadataWriting: + def test_write_metadata(self): + meta = metadata.Metadata.from_raw(_RAW_EXAMPLE) + written = meta.as_rfc822().as_string() + assert ( + written == "metadata-version: 2.3\nname: packaging\nversion: 2023.0.0\n\n" + ) + + def test_write_metadata_with_description(self): + # Intentionally out of order to make sure it is written in order + meta = metadata.Metadata.from_raw( + { + "version": "1.2.3", + "name": "Hello", + "description": "Hello\n\nWorld👋", + "metadata_version": "2.3", + } + ) + written = meta.as_rfc822().as_string() + assert ( + written == "metadata-version: 2.3\nname: Hello\n" + "version: 1.2.3\n\nHello\n\nWorld👋" + ) + written = meta.as_rfc822().as_bytes() + assert ( + written + == "metadata-version: 2.3\nname: Hello\n" + "version: 1.2.3\n\nHello\n\nWorld👋".encode() + ) + + def test_multiline_license(self): + meta = metadata.Metadata.from_raw( + { + "version": "1.2.3", + "name": "packaging", + "license": "Hello\nWorld🐍", + "metadata_version": "2.3", + } + ) + written = meta.as_rfc822().as_string() + assert ( + written == "metadata-version: 2.3\nname: packaging\nversion: 1.2.3" + "\nlicense: Hello\n World🐍\n\n" + ) + written = meta.as_rfc822().as_bytes() + assert ( + written + == "metadata-version: 2.3\nname: packaging\nversion: 1.2.3" + "\nlicense: Hello\n World🐍\n\n".encode() + ) + + def test_large(self): + meta = metadata.Metadata.from_raw( + { + "author": "Example!", + "author_email": "Unknown ", + "classifiers": [ + "Development Status :: 4 - Beta", + "Programming Language :: Python", + ], + "description": "some readme 👋\n", + "description_content_type": "text/markdown", + "keywords": ["trampolim", "is", "interesting"], + "license": "some license text", + "maintainer_email": "Other Example ", + "metadata_version": "2.1", + "name": "full_metadata", + "project_urls": { + "homepage": "example.com", + "documentation": "readthedocs.org", + "repository": "github.com/some/repo", + "changelog": "github.com/some/repo/blob/master/CHANGELOG.rst", + }, + "provides_extra": ["test"], + "requires_dist": [ + "dependency1", + "dependency2>1.0.0", + "dependency3[extra]", + 'dependency4; os_name != "nt"', + 'dependency5[other-extra]>1.0; os_name == "nt"', + 'test_dependency; extra == "test"', + 'test_dependency[test_extra]; extra == "test"', + "test_dependency[test_extra2]>3.0; " + 'os_name == "nt" and extra == "test"', + ], + "requires_python": ">=3.8", + "summary": "A package with all the metadata :)", + "version": "3.2.1", + } + ) + + core_metadata = meta.as_rfc822() + assert core_metadata.items() == [ + ("metadata-version", "2.1"), + ("name", "full_metadata"), + ("version", "3.2.1"), + ("summary", "A package with all the metadata :)"), + ("description-content-type", "text/markdown"), + ("keywords", "trampolim,is,interesting"), + ("author", "Example!"), + ("author-email", "Unknown "), + ("maintainer-email", "Other Example "), + ("license", "some license text"), + ("classifier", "Development Status :: 4 - Beta"), + ("classifier", "Programming Language :: Python"), + ("requires-dist", "dependency1"), + ("requires-dist", "dependency2>1.0.0"), + ("requires-dist", "dependency3[extra]"), + ("requires-dist", 'dependency4; os_name != "nt"'), + ("requires-dist", 'dependency5[other-extra]>1.0; os_name == "nt"'), + ("requires-dist", 'test_dependency; extra == "test"'), + ("requires-dist", 'test_dependency[test_extra]; extra == "test"'), + ( + "requires-dist", + 'test_dependency[test_extra2]>3.0; os_name == "nt" and extra == "test"', + ), + ("requires-python", ">=3.8"), + ("project-url", "homepage, example.com"), + ("project-url", "documentation, readthedocs.org"), + ("project-url", "repository, github.com/some/repo"), + ( + "project-url", + "changelog, github.com/some/repo/blob/master/CHANGELOG.rst", + ), + ("provides-extra", "test"), + ] + + assert core_metadata.get_payload() == "some readme 👋\n" + + def test_modern_license(self): + meta = metadata.Metadata.from_raw( + { + "metadata_version": "2.4", + "name": "full_metadata", + "version": "3.2.1", + "license_expression": "MIT", + "license_files": ["LICENSE.txt", "LICENSE"], + } + ) + + core_metadata = meta.as_rfc822() + assert core_metadata.items() == [ + ("metadata-version", "2.4"), + ("name", "full_metadata"), + ("version", "3.2.1"), + ("license-expression", "MIT"), + ("license-file", "LICENSE.txt"), + ("license-file", "LICENSE"), + ] + + assert core_metadata.get_payload() is None + + @pytest.mark.parametrize( + ("items", "data"), + [ + pytest.param( + [], + "", + id="empty", + ), + pytest.param( + [ + ("Foo", "Bar"), + ], + "Foo: Bar\n", + id="simple", + ), + pytest.param( + [ + ("Foo", "Bar"), + ("Foo2", "Bar2"), + ], + """\ + Foo: Bar + Foo2: Bar2 + """, + id="multiple", + ), + pytest.param( + [ + ("Foo", "Unicøde"), + ], + "Foo: Unicøde\n", + id="unicode", + ), + pytest.param( + [ + ("Foo", "🕵️"), + ], + "Foo: 🕵️\n", + id="emoji", + ), + pytest.param( + [ + ("Item", None), + ], + "", + id="none", + ), + pytest.param( + [ + ("ItemA", "ValueA"), + ("ItemB", "ValueB"), + ("ItemC", "ValueC"), + ], + """\ + ItemA: ValueA + ItemB: ValueB + ItemC: ValueC + """, + id="order 1", + ), + pytest.param( + [ + ("ItemB", "ValueB"), + ("ItemC", "ValueC"), + ("ItemA", "ValueA"), + ], + """\ + ItemB: ValueB + ItemC: ValueC + ItemA: ValueA + """, + id="order 2", + ), + pytest.param( + [ + ("ItemA", "ValueA1"), + ("ItemB", "ValueB"), + ("ItemC", "ValueC"), + ("ItemA", "ValueA2"), + ], + """\ + ItemA: ValueA1 + ItemB: ValueB + ItemC: ValueC + ItemA: ValueA2 + """, + id="multiple keys", + ), + pytest.param( + [ + ("ItemA", "ValueA"), + ("ItemB", "ValueB1\nValueB2\nValueB3"), + ("ItemC", "ValueC"), + ], + """\ + ItemA: ValueA + ItemB: ValueB1 + ValueB2 + ValueB3 + ItemC: ValueC + """, + id="multiline", + ), + ], + ) + def test_headers(self, items: list[tuple[str, None | str]], data: str) -> None: + message = metadata.RFC822Message() + + for name, value in items: + if value: + message[name] = value + + data = textwrap.dedent(data) + "\n" + assert str(message) == data + assert bytes(message) == data.encode() + + assert email.message_from_string(str(message)).items() == [ + (a, "\n ".join(b.splitlines())) for a, b in items if b is not None + ] + + def test_body(self) -> None: + message = metadata.RFC822Message() + + message["ItemA"] = "ValueA" + message["ItemB"] = "ValueB" + message["ItemC"] = "ValueC" + body = inspect.cleandoc( + """ + Lorem ipsum dolor sit amet, consectetur adipiscing elit. Mauris + congue semper fermentum. Nunc vitae tempor ante. Aenean aliquet + posuere lacus non faucibus. In porttitor congue luctus. Vivamus eu + dignissim orci. Donec egestas mi ac ipsum volutpat, vel elementum + sapien consectetur. Praesent dictum finibus fringilla. Sed vel + feugiat leo. Nulla a pharetra augue, at tristique metus. + + Aliquam fermentum elit at risus sagittis, vel pretium augue congue. + Donec leo risus, faucibus vel posuere efficitur, feugiat ut leo. + Aliquam vestibulum vel dolor id elementum. Ut bibendum nunc interdum + neque interdum, vel tincidunt lacus blandit. Ut volutpat + sollicitudin dapibus. Integer vitae lacinia ex, eget finibus nulla. + Donec sit amet ante in neque pulvinar faucibus sed nec justo. Fusce + hendrerit massa libero, sit amet pulvinar magna tempor quis. ø + """ + ) + headers = inspect.cleandoc( + """ + ItemA: ValueA + ItemB: ValueB + ItemC: ValueC + """ + ) + full = f"{headers}\n\n{body}" + + message.set_payload(textwrap.dedent(body)) + + assert str(message) == full + + new_message = email.message_from_string(str(message)) + assert new_message.items() == message.items() + assert new_message.get_payload() == message.get_payload() + + assert bytes(message) == full.encode("utf-8")