Skip to content

Commit

Permalink
Add type annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
nineteendo committed Aug 12, 2024
1 parent edffac7 commit 1159f22
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 71 deletions.
14 changes: 1 addition & 13 deletions src/jsonyx/_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,7 @@ class JSONSyntaxError(SyntaxError):
def __init__(
self, msg: str, filename: str, doc: str, start: int, end: int = 0,
) -> None:
"""Create a new JSON syntax error.
:param msg: an error message
:type msg: str
:param filename: the path to the JSON file
:type filename: str
:param doc: a JSON string
:type doc: str
:param start: the start position
:type start: int
:param end: the end position, defaults to 0
:type end: int, optional
"""
"""Create a new JSON syntax error."""
lineno: int = (
doc.count("\n", 0, start)
+ doc.count("\r", 0, start)
Expand Down
178 changes: 120 additions & 58 deletions src/jsonyx/patch.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,56 @@
# Copyright (C) 2024 Nice Zombies
"""JSON patcher."""
# TODO(Nice Zombies): add error messages
# TODO(Nice Zombies): add type annotations
# TODO(Nice Zombies): export API
# TODO(Nice Zombies): raise JSONSyntaxError
# TODO(Nice Zombies): remove executable code
# TODO(Nice Zombies): update command line options
# TODO(Nice Zombies): write documentation
# TODO(Nice Zombies): write tests
from __future__ import annotations

__all__: list[str] = ["patch"]

import re
from math import isinf
from operator import eq, ge, gt, le, lt, ne
from re import DOTALL, MULTILINE, VERBOSE
from re import DOTALL, MULTILINE, VERBOSE, Match, RegexFlag
from sys import maxsize
from typing import TYPE_CHECKING, Any

from jsonyx import dump

input_json = []
patch_json = [
if TYPE_CHECKING:
from collections.abc import Callable

input_json: Any = []
patch_json: list[dict[str, Any]] = [
{
"op": "insert",
"path": "$[0]",
"value": "value",
},
]

_FLAGS = VERBOSE | MULTILINE | DOTALL
_FLAGS: RegexFlag = VERBOSE | MULTILINE | DOTALL

_match_idx = re.compile(r"end|-?(?:0|[1-9]\d*)", _FLAGS).match
_match_key_chunk = re.compile(r"[^!&.<=>[\]~]*", _FLAGS).match
_match_number = re.compile(
_match_idx: Callable[[str, int], Match[str] | None] = re.compile(
r"end|-?(?:0|[1-9]\d*)", _FLAGS,
).match
_match_key_chunk: Callable[[str, int], Match[str] | None] = re.compile(
r"[^!&.<=>[\]~]*", _FLAGS,
).match
_match_number: Callable[[str, int], Match[str] | None] = re.compile(
r"(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?", _FLAGS,
).match
_match_str_chunk = re.compile(r"[^'~]*", _FLAGS).match
_match_str_chunk: Callable[[str, int], Match[str] | None] = re.compile(
r"[^'~]*", _FLAGS,
).match


def _get_key(s, end=0):
chunks = []
append_chunk = chunks.append
def _get_key(s: str, end: int = 0) -> tuple[str, int]:
chunks: list[str] = []
append_chunk: Callable[[str], None] = chunks.append
while True:
if match := _match_key_chunk(s, end):
end = match.end()
Expand All @@ -45,7 +61,7 @@ def _get_key(s, end=0):

end += 1
try:
esc = s[end]
esc: str = s[end]
except IndexError:
raise SyntaxError from None

Expand All @@ -56,31 +72,39 @@ def _get_key(s, end=0):
append_chunk(esc)


def _get_targets(node):
def _get_targets(
node: tuple[dict[Any, Any] | list[Any], int | slice | str],
) -> list[dict[Any, Any] | list[Any]]:
target, key = node
if isinstance(target, dict) and not isinstance(key, str):
raise TypeError

if isinstance(target, list) and isinstance(key, str):
raise TypeError

targets = target[key] if isinstance(key, slice) else [target[key]]
if isinstance(key, slice):
targets: list[Any] = target[key]
else:
targets = [target[key]] # type: ignore

if not all(isinstance(target, (dict, list)) for target in targets):
raise TypeError

return targets


def _get_idx(match):
def _get_idx(match: Match[str]) -> tuple[int, int]:
if (group := match.group()) == "end":
idx = maxsize
idx: int = maxsize
else:
idx = int(group)

return idx, match.end()


def _get_operator(s, end):
def _get_operator(
s: str, end: int,
) -> tuple[Callable[[Any, Any], Any] | None, int]:
if s[end:end + 1] == "<":
operator, end = lt, end + 1
elif s[end:end + 1] == "<=":
Expand All @@ -99,16 +123,16 @@ def _get_operator(s, end):
return operator, end


def _get_str(s, end):
chunks = []
append_chunk = chunks.append
def _get_str(s: str, end: int) -> tuple[str, int]:
chunks: list[str] = []
append_chunk: Callable[[str], None] = chunks.append
while True:
if match := _match_str_chunk(s, end):
end = match.end()
append_chunk(match.group())

try:
terminator = s[end]
terminator: str = s[end]
except IndexError:
raise SyntaxError from None

Expand All @@ -117,7 +141,7 @@ def _get_str(s, end):

end += 1
try:
esc = s[end]
esc: str = s[end]
except IndexError:
raise SyntaxError from None

Expand All @@ -130,12 +154,14 @@ def _get_str(s, end):

# TODO(Nice Zombies): allow_nan_and_infinity=False
# TODO(Nice Zombies): use_decimal=True
def _get_value(s, idx):
# pylint: disable-next=R0912
def _get_value(s: str, idx: int) -> tuple[Any, int]: # noqa: C901, PLR0912
try:
nextchar = s[idx]
nextchar: str = s[idx]
except IndexError:
raise SyntaxError from None

value: Any
if nextchar == "'":
value, end = _get_str(s, idx + 1)
elif nextchar == "n" and s[idx:idx + 4] == "null":
Expand Down Expand Up @@ -165,7 +191,11 @@ def _get_value(s, idx):
return value, end


def _run_query(nodes, path, end):
def _run_query( # noqa: C901
nodes: list[tuple[dict[Any, Any] | list[Any], int | slice | str]],
s: str,
end: int,
) -> tuple[list[tuple[dict[Any, Any] | list[Any], int | slice | str]], int]:
nodes = [
(target, key)
for node in nodes
Expand All @@ -178,57 +208,73 @@ def _run_query(nodes, path, end):
]

while True:
key, end = _get_key(path, end)
key: int | slice | str
key, end = _get_key(s, end)
if key == "?":
negate_filter = False
negate_filter: bool = False
elif key == "!":
negate_filter = True
else:
raise SyntaxError

filter_nodes, end = _traverse(nodes, path, end, single=True)
pairs = [
(node, target[key])
for node, (target, key) in zip(nodes, filter_nodes, strict=True)
filter_nodes, end = _traverse(nodes, s, end, single=True)
pairs: list[
tuple[tuple[dict[Any, Any] | list[Any], int | slice | str], Any]
] = []
for node, (target, key) in zip(nodes, filter_nodes):
if isinstance(target, dict) and not isinstance(key, str):
raise TypeError

if isinstance(target, list) and isinstance(key, str):
raise TypeError

if (
key in target
if isinstance(target, dict) else
-len(target) <= key < len(target)
) != negate_filter
]
-len(target) <= key < len(target) # type: ignore
) != negate_filter:
pairs.append((node, target[key])) # type: ignore

operator, end = _get_operator(path, end)
operator, end = _get_operator(s, end)
if operator is None:
nodes = [node for node, _target in pairs]
elif negate_filter:
raise SyntaxError
else:
value, end = _get_value(path, end)
value, end = _get_value(s, end)
nodes = [node for node, target in pairs if operator(target, value)]

if path[end:end + 2] != "&&":
if s[end:end + 2] != "&&":
return nodes, end

end += 2


def _traverse(nodes, path, end, *, single=False):
# pylint: disable-next=R0912
def _traverse( # noqa: C901, PLR0912
nodes: list[tuple[dict[Any, Any] | list[Any], int | slice | str]],
s: str,
end: int,
*,
single: bool = False,
) -> tuple[list[tuple[dict[Any, Any] | list[Any], int | slice | str]], int]:
while True:
if (terminator := path[end:end + 1]) == ".":
key, end = _get_key(path, end + 1)
key: int | slice | str
if (terminator := s[end:end + 1]) == ".":
key, end = _get_key(s, end + 1)
nodes = [
(target, key)
for node in nodes
for target in _get_targets(node)
]
elif terminator == "[":
if match := _match_idx(path, end + 1):
if match := _match_idx(s, end + 1):
idx, end = _get_idx(match)
if path[end:end + 1] != ":":
if s[end:end + 1] != ":":
key = idx
elif single:
raise SyntaxError
elif match := _match_idx(path, end + 1):
elif match := _match_idx(s, end + 1):
idx2, end = _get_idx(match)
key = slice(idx, idx2)
else:
Expand All @@ -242,10 +288,10 @@ def _traverse(nodes, path, end, *, single=False):
elif single:
raise SyntaxError
else:
nodes, end = _run_query(nodes, path, end + 1)
nodes, end = _run_query(nodes, s, end + 1)

try:
terminator = path[end]
terminator = s[end]
except IndexError:
raise SyntaxError from None

Expand All @@ -257,24 +303,40 @@ def _traverse(nodes, path, end, *, single=False):
return nodes, end


def _traverser(nodes, path):
key, end = _get_key(path)
def _traverser(
nodes: list[tuple[dict[Any, Any] | list[Any], int | slice | str]], s: str,
) -> list[tuple[dict[Any, Any] | list[Any], int | slice | str]]:
key, end = _get_key(s)
if key != "$":
raise SyntaxError

nodes, end = _traverse(nodes, path, end)
if end < len(path):
nodes, end = _traverse(nodes, s, end)
if end < len(s):
raise SyntaxError

return nodes


def patch(obj, operations):
root = [obj]
nodes = [(root, 0)]
# pylint: disable-next=R0912
def patch( # noqa: C901, PLR0912
obj: Any, operations: list[dict[str, Any]],
) -> Any:
"""Patch a Python object with a list of operations.
:param obj: a Python object
:type obj: Any
:param operations: a list of operations
:type operations: list[dict[str, Any]]
:return: the patched Python object
:rtype: Any
"""
root: list[Any] = [obj]
nodes: list[tuple[dict[Any, Any] | list[Any], int | slice | str]] = [
(root, 0),
]
for operation in operations:
op = operation["op"]
path = operation["path"]
op: str = operation["op"]
path: str = operation["path"]
if op == "del":
# Reverse to preserve indices for queries
for target, key in reversed(_traverser(nodes, path)):
Expand All @@ -287,9 +349,9 @@ def patch(obj, operations):
if isinstance(target, list) and isinstance(key, str):
raise TypeError

del target[key]
del target[key] # type: ignore
elif op == "insert":
value = operation["value"]
value: Any = operation["value"]
# Reverse to preserve indices for queries
for target, key in reversed(_traverser(nodes, path)):
if target is root:
Expand All @@ -311,7 +373,7 @@ def patch(obj, operations):
if isinstance(target, list) and isinstance(key, str):
raise TypeError

target[key] = value
target[key] = value # type: ignore
else:
raise ValueError

Expand Down

0 comments on commit 1159f22

Please sign in to comment.