diff --git a/dictdatabase/utils.py b/dictdatabase/utils.py index 54ede4a..a3fb9fc 100644 --- a/dictdatabase/utils.py +++ b/dictdatabase/utils.py @@ -66,28 +66,36 @@ def seek_index_through_value(data: str, index: int) -> int: :param data: The string to be parsed :param index: the index of the first character of the value """ - in_str, list_depth, dict_depth = False, 0, 0 - d_prev, d_curr = None, data[index - 1] + # See https://www.json.org/json-en.html for the JSON syntax + + skip_next, in_str, list_depth, dict_depth = False, False, 0, 0 + for i in range(index, len(data)): - d_prev, d_curr = d_curr, data[i] - prev_backslash = d_prev == "\\" - if d_curr == '"' and not prev_backslash: - in_str = not in_str + if skip_next: + skip_next = False continue - if in_str or d_curr == " " or prev_backslash: + current = data[i] + if current == "\\": + skip_next = True continue - if d_curr == "[": + if current == '"': + in_str = not in_str + if in_str or current == " ": + continue + if current == "[": list_depth += 1 - elif d_curr == "]": + elif current == "]": list_depth -= 1 - elif d_curr == "{": + elif current == "{": dict_depth += 1 - elif d_curr == "}": + elif current == "}": dict_depth -= 1 if list_depth == 0 and dict_depth == 0: return i + 1 + raise TypeError("Invalid JSON syntax") + def count_nesting(data: str, start: int, end: int) -> int: """ diff --git a/tests/test_partial.py b/tests/test_partial.py index 5062b13..c943ec3 100644 --- a/tests/test_partial.py +++ b/tests/test_partial.py @@ -13,8 +13,8 @@ def test_subread(env, use_compression, use_orjson, sort_keys, indent): } DDB.at(name).create(j, force_overwrite=True) - with pytest.raises(json.decoder.JSONDecodeError): - DDB.at(name).read("a") == "Hello}{" + + assert DDB.at(name).read("a") == "Hello{}" with pytest.raises(KeyError): DDB.at(name).read("f") diff --git a/tests/test_read.py b/tests/test_read.py index 9c8c025..95462e6 100644 --- a/tests/test_read.py +++ b/tests/test_read.py @@ -1,6 +1,7 @@ -from unicodedata import name + import dictdatabase as DDB import pytest +import json from tests.utils import make_complex_nested_random_dict @@ -9,6 +10,31 @@ def test_non_existent(env, use_compression, use_orjson, sort_keys, indent): assert d is None +def test_read_integrity(): + cases = [ + r'{"a": "\\", "b": 2}', + r'{"a": "\\\\", "b": 2}', + r'{"a": "\\\\\"", "b": 2}', + r'{"a": "\\\"\\", "b": 2}', + r'{"a": "\"\\\\", "b": 2}', + r'{"a": "\"", "b": 2}', + r'{"a": "\"\"", "b": 2}', + r'{"a": "\"\"\\", "b": 2}', + r'{"a": "\"\\\"", "b": 2}', + r'{"a": "\\\"\"", "b": 2}', + ] + + for case in cases: + with open(f"{DDB.config.storage_directory}/test_read_integrity.json", "w") as f: + f.write(case) + dd = DDB.at("test_read_integrity").read(key="a") + assert dd == json.loads(case)["a"] + + + + + + def test_create_and_read(env, use_compression, use_orjson, sort_keys, indent): name = "test_create_and_read" d = make_complex_nested_random_dict(12, 6)