Skip to content

Commit

Permalink
feat: support writing metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Henry Schreiner <[email protected]>
  • Loading branch information
henryiii committed Oct 24, 2024
1 parent 6c338a8 commit ef4fd77
Show file tree
Hide file tree
Showing 2 changed files with 277 additions and 2 deletions.
113 changes: 111 additions & 2 deletions src/packaging/metadata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

import builtins
import dataclasses
import email.feedparser
import email.header
import email.message
import email.parser
import email.policy
import sys
import typing
from typing import (
Any,
Expand All @@ -22,8 +24,8 @@
T = typing.TypeVar("T")


if "ExceptionGroup" in builtins.__dict__: # pragma: no cover
ExceptionGroup = ExceptionGroup
if sys.version_info >= (3, 11):
ExceptionGroup = builtins.ExceptionGroup
else: # pragma: no cover

class ExceptionGroup(Exception):
Expand Down Expand Up @@ -269,6 +271,72 @@ def _get_payload(msg: email.message.Message, source: bytes | str) -> str:
"version": "version",
}
_RAW_TO_EMAIL_MAPPING = {raw: email for email, raw in _EMAIL_TO_RAW_MAPPING.items()}
_MULTI_FIELDS = {_RAW_TO_EMAIL_MAPPING[x] for x in _LIST_FIELDS | _DICT_FIELDS}


@dataclasses.dataclass
class _JSonMessageSetter:
"""
This provides an API to build a JSON message output in the same way as the
classic Message. Line breaks are preserved this way.
"""

data: dict[str, str | list[str]]

def __setitem__(self, name: str, value: str | None) -> None:
key = name.replace("-", "_")
if value is None:
return

if name == "keywords":
values = (x.strip() for x in value.split(","))
self.data[key] = [x for x in values if x]
elif name in _MULTI_FIELDS:
entry = self.data.setdefault(key, [])
assert isinstance(entry, list)
entry.append(value)
else:
self.data[key] = value

def set_payload(self, payload: str) -> None:
self["description"] = payload


# This class is for writing RFC822 messages
class RFC822Policy(email.policy.EmailPolicy):
"""
This is :class:`email.policy.EmailPolicy`, but with a simple ``header_store_parse``
implementation that handles multiline values, and some nice defaults.
"""

utf8 = True
mangle_from_ = False
max_line_length = 0

def header_store_parse(self, name: str, value: str) -> tuple[str, str]:
size = len(name) + 2
value = value.replace("\n", "\n" + " " * size)
return (name, value)


# This class is for writing RFC822 messages
class RFC822Message(email.message.EmailMessage):
"""
This is :class:`email.message.EmailMessage` with two small changes: it defaults to
our `RFC822Policy`, and it correctly writes unicode when being called
with `bytes()`.
"""

def __init__(self) -> None:
super().__init__(policy=RFC822Policy())

def as_bytes(
self, unixfrom: bool = False, policy: email.policy.Policy | None = None
) -> bytes:
"""
This handles unicode encoding.
"""
return self.as_string(unixfrom, policy=policy).encode("utf-8")


def parse_email(data: bytes | str) -> tuple[RawMetadata, dict[str, list[str]]]:
Expand Down Expand Up @@ -806,3 +874,44 @@ def from_email(cls, data: bytes | str, *, validate: bool = True) -> Metadata:
"""``Provides`` (deprecated)"""
obsoletes: _Validator[list[str] | None] = _Validator(added="1.1")
"""``Obsoletes`` (deprecated)"""

def as_rfc822(self) -> RFC822Message:
"""
Return an RFC822 message with the metadata.
"""
message = RFC822Message()
self._write_metadata(message)
return message

def as_json(self) -> dict[str, str | list[str]]:
"""
Return a JSON message with the metadata.
"""
message: dict[str, str | list[str]] = {}
smart_message = _JSonMessageSetter(message)
self._write_metadata(smart_message)
return message

def _write_metadata(self, message: RFC822Message | _JSonMessageSetter) -> None:
"""
Return an RFC822 message with the metadata.
"""
for name, validator in self.__class__.__dict__.items():
if isinstance(validator, _Validator) and name != "description":
value = getattr(self, name)
email_name = _RAW_TO_EMAIL_MAPPING[name]
if value is not None:
if email_name == "project-url":
for label, url in value.items():
message[email_name] = f"{label}, {url}"
elif email_name == "keywords":
message[email_name] = ",".join(value)
elif isinstance(value, list):
for item in value:
message[email_name] = str(item)
else:
message[email_name] = str(value)

# The description is a special case because it is in the body of the message.
if self.description is not None:
message.set_payload(self.description)
166 changes: 166 additions & 0 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,169 @@ def test_disallowed_dynamic(self, field_name):
def test_optional_defaults_to_none(self, field_name):
meta = metadata.Metadata.from_raw({}, validate=False)
assert getattr(meta, field_name) is None


class TestMetadataWriting:
def test_write_metadata(self):
meta = metadata.Metadata.from_raw(_RAW_EXAMPLE)
written = meta.as_rfc822().as_string()
assert (
written == "metadata-version: 2.3\nname: packaging\nversion: 2023.0.0\n\n"
)

def test_write_metadata_with_description(self):
# Intentionally out of order to make sure it is written in order
meta = metadata.Metadata.from_raw(
{
"version": "1.2.3",
"name": "Hello",
"description": "Hello\n\nWorld👋",
"metadata_version": "2.3",
}
)
written = meta.as_rfc822().as_string()
assert (
written == "metadata-version: 2.3\nname: Hello\n"
"version: 1.2.3\n\nHello\n\nWorld👋"
)
written = meta.as_rfc822().as_bytes()
assert (
written
== "metadata-version: 2.3\nname: Hello\n"
"version: 1.2.3\n\nHello\n\nWorld👋".encode()
)

def test_multiline_license(self):
meta = metadata.Metadata.from_raw(
{
"version": "1.2.3",
"name": "packaging",
"license": "Hello\nWorld🐍",
"metadata_version": "2.3",
}
)
written = meta.as_rfc822().as_string()
assert (
written == "metadata-version: 2.3\nname: packaging\nversion: 1.2.3"
"\nlicense: Hello\n World🐍\n\n"
)
written = meta.as_rfc822().as_bytes()
assert (
written
== "metadata-version: 2.3\nname: packaging\nversion: 1.2.3"
"\nlicense: Hello\n World🐍\n\n".encode()
)

def test_large(self):
meta = metadata.Metadata.from_raw(
{
"author": "Example!",
"author_email": "Unknown <[email protected]>",
"classifiers": [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
],
"description": "some readme 👋\n",
"description_content_type": "text/markdown",
"keywords": ["trampolim", "is", "interesting"],
"license": "some license text",
"maintainer_email": "Other Example <[email protected]>",
"metadata_version": "2.1",
"name": "full_metadata",
"project_urls": {
"homepage": "example.com",
"documentation": "readthedocs.org",
"repository": "github.com/some/repo",
"changelog": "github.com/some/repo/blob/master/CHANGELOG.rst",
},
"provides_extra": ["test"],
"requires_dist": [
"dependency1",
"dependency2>1.0.0",
"dependency3[extra]",
'dependency4; os_name != "nt"',
'dependency5[other-extra]>1.0; os_name == "nt"',
'test_dependency; extra == "test"',
'test_dependency[test_extra]; extra == "test"',
"test_dependency[test_extra2]>3.0; "
'os_name == "nt" and extra == "test"',
],
"requires_python": ">=3.8",
"summary": "A package with all the metadata :)",
"version": "3.2.1",
}
)

assert meta.as_json() == {
"author": "Example!",
"author_email": "Unknown <[email protected]>",
"classifier": [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
],
"description": "some readme 👋\n",
"description_content_type": "text/markdown",
"keywords": ["trampolim", "is", "interesting"],
"license": "some license text",
"maintainer_email": "Other Example <[email protected]>",
"metadata_version": "2.1",
"name": "full_metadata",
"project_url": [
"homepage, example.com",
"documentation, readthedocs.org",
"repository, github.com/some/repo",
"changelog, github.com/some/repo/blob/master/CHANGELOG.rst",
],
"provides_extra": ["test"],
"requires_dist": [
"dependency1",
"dependency2>1.0.0",
"dependency3[extra]",
'dependency4; os_name != "nt"',
'dependency5[other-extra]>1.0; os_name == "nt"',
'test_dependency; extra == "test"',
'test_dependency[test_extra]; extra == "test"',
'test_dependency[test_extra2]>3.0; os_name == "nt" and extra == "test"',
],
"requires_python": ">=3.8",
"summary": "A package with all the metadata :)",
"version": "3.2.1",
}

core_metadata = meta.as_rfc822()
assert core_metadata.items() == [
("metadata-version", "2.1"),
("name", "full_metadata"),
("version", "3.2.1"),
("summary", "A package with all the metadata :)"),
("description-content-type", "text/markdown"),
("keywords", "trampolim,is,interesting"),
("author", "Example!"),
("author-email", "Unknown <[email protected]>"),
("maintainer-email", "Other Example <[email protected]>"),
("license", "some license text"),
("classifier", "Development Status :: 4 - Beta"),
("classifier", "Programming Language :: Python"),
("requires-dist", "dependency1"),
("requires-dist", "dependency2>1.0.0"),
("requires-dist", "dependency3[extra]"),
("requires-dist", 'dependency4; os_name != "nt"'),
("requires-dist", 'dependency5[other-extra]>1.0; os_name == "nt"'),
("requires-dist", 'test_dependency; extra == "test"'),
("requires-dist", 'test_dependency[test_extra]; extra == "test"'),
(
"requires-dist",
'test_dependency[test_extra2]>3.0; os_name == "nt" and extra == "test"',
),
("requires-python", ">=3.8"),
("project-url", "homepage, example.com"),
("project-url", "documentation, readthedocs.org"),
("project-url", "repository, github.com/some/repo"),
(
"project-url",
"changelog, github.com/some/repo/blob/master/CHANGELOG.rst",
),
("provides-extra", "test"),
]

assert core_metadata.get_payload() == "some readme 👋\n"

0 comments on commit ef4fd77

Please sign in to comment.