diff --git a/tools/__pycache__/generated_classes.cpython-311.pyc b/tools/__pycache__/generated_classes.cpython-311.pyc deleted file mode 100644 index 1ffff5c6..00000000 Binary files a/tools/__pycache__/generated_classes.cpython-311.pyc and /dev/null differ diff --git a/tools/__pycache__/utils.cpython-39.pyc b/tools/__pycache__/utils.cpython-39.pyc deleted file mode 100644 index b8609f4b..00000000 Binary files a/tools/__pycache__/utils.cpython-39.pyc and /dev/null differ diff --git a/tools/example-api-usage.py b/tools/example-api-usage.py deleted file mode 100644 index dab9a982..00000000 --- a/tools/example-api-usage.py +++ /dev/null @@ -1,3 +0,0 @@ -import python_api - - diff --git a/tools/generate_mosaic_schema_wrapper.py b/tools/generate_mosaic_schema_wrapper.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tools/generate_schema_wrapper.py b/tools/generate_schema_wrapper.py deleted file mode 100644 index e51f4b92..00000000 --- a/tools/generate_schema_wrapper.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -from typing import Any, Dict, List, Union, Final, Iterable, Iterator, Literal -import sys -import yaml -import argparse -import copy -import re -import textwrap -from dataclasses import dataclass -from itertools import chain -from pathlib import Path -from urllib import request - -sys.path.insert(0, str(Path.cwd())) -from tools.schemapi import CodeSnippet, SchemaInfo, codegen -from tools.schemapi.utils import ( - TypeAliasTracer, - get_valid_identifier, - indent_docstring, - resolve_references, - rst_parse, - rst_syntax_for_class, - ruff_format_py, - ruff_write_lint_format_str, - spell_literal, -) - -def generate_class(class_name: str, class_schema: Dict[str, Any]) -> str: - - imports = "from typing import Any, Union\n" - - if 'type' in class_schema and 'properties' not in class_schema: - return f"class {class_name}:\n def __init__(self):\n pass\n" - - if '$ref' in class_schema: - ref_class_name = class_schema['$ref'].split('/')[-1] - return f"{imports}\nclass {class_name}:\n pass # This is a reference to {ref_class_name}\n" - - if 'anyOf' in class_schema: - return generate_any_of_class(class_name, class_schema['anyOf']) - - properties = class_schema.get('properties', {}) - required = class_schema.get('required', []) - - class_def = f"{imports}class {class_name}:\n" - class_def += " def __init__(self" - - for prop, prop_schema in properties.items(): - type_hint = get_type_hint(prop_schema) - if prop in required: - class_def += f", {prop}: {type_hint}" - else: - class_def += f", {prop}: {type_hint} = None" - - class_def += "):\n" - - for prop in properties: - class_def += f" self.{prop} = {prop}\n" - - return class_def - - -def generate_any_of_class(class_name: str, any_of_schemas: List[Dict[str, Any]]) -> str: - types = [get_type_hint(schema) for schema in any_of_schemas] - type_union = "Union[" + ", ".join(f'"{t}"' for t in types) + "]" - - class_def = f"class {class_name}:\n" - class_def += f" def __init__(self, value: {type_union}):\n" - class_def += " self.value = value\n" - - return class_def - - - -def get_type_hint(prop_schema: Dict[str, Any]) -> str: - """Get type hint for a property schema.""" - if 'type' in prop_schema: - if prop_schema['type'] == 'string': - return 'str' - elif prop_schema['type'] == 'boolean': - return 'bool' - elif prop_schema['type'] == 'object': - return 'Dict[str, Any]' - elif 'anyOf' in prop_schema: - types = [get_type_hint(option) for option in prop_schema['anyOf']] - return f'Union[{", ".join(types)}]' - elif '$ref' in prop_schema: - return prop_schema['$ref'].split('/')[-1] - return 'Any' - -def load_schema(schema_path: Path) -> dict: - """Load a JSON schema from the specified path.""" - with schema_path.open(encoding="utf8") as f: - return json.load(f) - -def generate_schema_wrapper(schema_file: Path, output_file: Path) -> str: - """Generate a schema wrapper for the given schema file.""" - rootschema = load_schema(schema_file) - - definitions: Dict[str, str] = {} - - for name, schema in rootschema.get("definitions", {}).items(): - class_code = generate_class(name, schema) - definitions[name] = class_code - - generated_classes = "\n\n".join(definitions.values()) - - with open(output_file, 'w') as f: - f.write(generated_classes) - -if __name__ == "__main__": - schema_file = "tools/testingSchema.json" - output_file = Path("tools/generated_classes.py") - generate_schema_wrapper(Path(schema_file), output_file) diff --git a/tools/generate_schema_wrapper_commented.py b/tools/generate_schema_wrapper_commented.py deleted file mode 100644 index 7328d880..00000000 --- a/tools/generate_schema_wrapper_commented.py +++ /dev/null @@ -1,1038 +0,0 @@ -from __future__ import annotations -"""Generate a schema wrapper from a schema.""" - -""" -(X) The file is organized into several key sections: - -1. **Constants**: - - This section defines constants that are used throughout the module for configuration and encoding methods. - -2. **Schema Generation Functions**: - - `generate_vegalite_schema_wrapper`: This function generates a schema wrapper for Vega-Lite based on the provided schema file. - - `load_schema_with_shorthand_properties`: Loads the schema and incorporates shorthand properties for easier usage. - - `_add_shorthand_property_to_field_encodings`: Adds shorthand properties to field encodings within the schema. - -3. **Utility Functions**: - - `copy_schemapi_util`: Copies the schemapi utility into the altair/utils directory for reuse. - - `recursive_dict_update`: Recursively updates a dictionary schema with new definitions, ensuring that references are resolved. - - `get_field_datum_value_defs`: Retrieves definitions for fields, datum, and values from a given property schema. - - `toposort`: Performs a topological sort on a directed acyclic graph, which is useful for managing dependencies between schema definitions. - -4. **Channel Wrapper Generation**: - - `generate_vegalite_channel_wrappers`: Generates channel wrappers for the Vega-Lite schema, allowing for the mapping of data properties to visual properties. - -5. **Mixin Generation**: - - `generate_vegalite_mark_mixin`: Creates a mixin class that defines methods for different types of marks in Vega-Lite. - - `generate_vegalite_config_mixin`: Generates a mixin class that provides configuration methods for the schema. - -6. **Main Execution Function**: - - `vegalite_main`: The main function that orchestrates the schema generation process, handling the loading of schemas and the creation of wrapper files. - -7. **Encoding Artifacts Generation**: - - `generate_encoding_artifacts`: Generates artifacts related to encoding, including type aliases and mixin classes for encoding methods. - -8. **Main Entry Point**: - - `main`: The entry point for the script, which processes command-line arguments and initiates the schema generation workflow. -""" - -import yaml -import argparse -import copy -import json -import re -import sys -import textwrap -from dataclasses import dataclass -from itertools import chain -from pathlib import Path -from typing import Final, Iterable, Iterator, Literal -from urllib import request - -import vl_convert as vlc - -sys.path.insert(0, str(Path.cwd())) -###(H) SchemaInfo class imported from altair/tools/schemapi/utils.py -### It's a wrapper for inspecting JSON schema -from tools.schemapi import CodeSnippet, SchemaInfo, codegen -from tools.schemapi.utils import ( - TypeAliasTracer, - get_valid_identifier, - indent_docstring, - resolve_references, - rst_parse, - rst_syntax_for_class, - ruff_format_py, - ruff_write_lint_format_str, - spell_literal, -) - -SCHEMA_VERSION: Final = "v5.20.1" - -reLink = re.compile(r"(?<=\[)([^\]]+)(?=\]\([^\)]+\))", re.MULTILINE) -reSpecial = re.compile(r"[*_]{2,3}|`", re.MULTILINE) - -HEADER: Final = """\ -# The contents of this file are automatically written by -# tools/generate_schema_wrapper.py. Do not modify directly. -""" - -SCHEMA_URL_TEMPLATE: Final = "https://vega.github.io/schema/{library}/{version}.json" - -CHANNEL_MYPY_IGNORE_STATEMENTS: Final = """\ -# These errors need to be ignored as they come from the overload methods -# which trigger two kind of errors in mypy: -# * all of them do not have an implementation in this file -# * some of them are the only overload methods -> overloads usually only make -# sense if there are multiple ones -# However, we need these overloads due to how the propertysetter works -# mypy: disable-error-code="no-overload-impl, empty-body, misc" -""" - -BASE_SCHEMA: Final = """ -class {basename}(SchemaBase): - _rootschema = load_schema() - @classmethod - def _default_wrapper_classes(cls) -> Iterator[type[Any]]: - return _subclasses({basename}) -""" - -LOAD_SCHEMA: Final = ''' -def load_schema() -> dict: - """Load the json schema associated with this module's functions""" - schema_bytes = pkgutil.get_data(__name__, "{schemafile}") - if schema_bytes is None: - raise ValueError("Unable to load {schemafile}") - return json.loads( - schema_bytes.decode("utf-8") - ) -''' - - -CHANNEL_MIXINS: Final = """ -class FieldChannelMixin: - _encoding_name: str - def to_dict( - self, - validate: bool = True, - ignore: list[str] | None = None, - context: dict[str, Any] | None = None, - ) -> dict | list[dict]: - context = context or {} - ignore = ignore or [] - shorthand = self._get("shorthand") # type: ignore[attr-defined] - field = self._get("field") # type: ignore[attr-defined] - - if shorthand is not Undefined and field is not Undefined: - msg = f"{self.__class__.__name__} specifies both shorthand={shorthand} and field={field}. " - raise ValueError(msg) - - if isinstance(shorthand, (tuple, list)): - # If given a list of shorthands, then transform it to a list of classes - kwds = self._kwds.copy() # type: ignore[attr-defined] - kwds.pop("shorthand") - return [ - self.__class__(sh, **kwds).to_dict( # type: ignore[call-arg] - validate=validate, ignore=ignore, context=context - ) - for sh in shorthand - ] - - if shorthand is Undefined: - parsed = {} - elif isinstance(shorthand, str): - data: nw.DataFrame | Any = context.get("data", None) - parsed = parse_shorthand(shorthand, data=data) - type_required = "type" in self._kwds # type: ignore[attr-defined] - type_in_shorthand = "type" in parsed - type_defined_explicitly = self._get("type") is not Undefined # type: ignore[attr-defined] - if not type_required: - # Secondary field names don't require a type argument in VegaLite 3+. - # We still parse it out of the shorthand, but drop it here. - parsed.pop("type", None) - elif not (type_in_shorthand or type_defined_explicitly): - if isinstance(data, nw.DataFrame): - msg = ( - f'Unable to determine data type for the field "{shorthand}";' - " verify that the field name is not misspelled." - " If you are referencing a field from a transform," - " also confirm that the data type is specified correctly." - ) - raise ValueError(msg) - else: - msg = ( - f"{shorthand} encoding field is specified without a type; " - "the type cannot be automatically inferred because " - "the data is not specified as a pandas.DataFrame." - ) - raise ValueError(msg) - else: - # Shorthand is not a string; we pass the definition to field, - # and do not do any parsing. - parsed = {"field": shorthand} - context["parsed_shorthand"] = parsed - - return super(FieldChannelMixin, self).to_dict( - validate=validate, ignore=ignore, context=context - ) - - -class ValueChannelMixin: - _encoding_name: str - def to_dict( - self, - validate: bool = True, - ignore: list[str] | None = None, - context: dict[str, Any] | None = None, - ) -> dict: - context = context or {} - ignore = ignore or [] - condition = self._get("condition", Undefined) # type: ignore[attr-defined] - copy = self # don't copy unless we need to - if condition is not Undefined: - if isinstance(condition, core.SchemaBase): - pass - elif "field" in condition and "type" not in condition: - kwds = parse_shorthand(condition["field"], context.get("data", None)) - copy = self.copy(deep=["condition"]) # type: ignore[attr-defined] - copy["condition"].update(kwds) # type: ignore[index] - return super(ValueChannelMixin, copy).to_dict( - validate=validate, ignore=ignore, context=context - ) - - -class DatumChannelMixin: - _encoding_name: str - def to_dict( - self, - validate: bool = True, - ignore: list[str] | None = None, - context: dict[str, Any] | None = None, - ) -> dict: - context = context or {} - ignore = ignore or [] - datum = self._get("datum", Undefined) # type: ignore[attr-defined] # noqa - copy = self # don't copy unless we need to - return super(DatumChannelMixin, copy).to_dict( - validate=validate, ignore=ignore, context=context - ) -""" - -MARK_METHOD: Final = ''' -def mark_{mark}({def_arglist}) -> Self: - """Set the chart's mark to '{mark}' (see :class:`{mark_def}`) - """ - kwds = dict({dict_arglist}) - copy = self.copy(deep=False) # type: ignore[attr-defined] - if any(val is not Undefined for val in kwds.values()): - copy.mark = core.{mark_def}(type="{mark}", **kwds) - else: - copy.mark = "{mark}" - return copy -''' - -CONFIG_METHOD: Final = """ -@use_signature(core.{classname}) -def {method}(self, *args, **kwargs) -> Self: - copy = self.copy(deep=False) # type: ignore[attr-defined] - copy.config = core.{classname}(*args, **kwargs) - return copy -""" - -CONFIG_PROP_METHOD: Final = """ -@use_signature(core.{classname}) -def configure_{prop}(self, *args, **kwargs) -> Self: - copy = self.copy(deep=['config']) # type: ignore[attr-defined] - if copy.config is Undefined: - copy.config = core.Config() - copy.config["{prop}"] = core.{classname}(*args, **kwargs) - return copy -""" - -ENCODE_METHOD: Final = ''' -class _EncodingMixin: - def encode({method_args}) -> Self: - """Map properties of the data to visual properties of the chart (see :class:`FacetedEncoding`) - {docstring}""" - # Compat prep for `infer_encoding_types` signature - kwargs = locals() - kwargs.pop("self") - args = kwargs.pop("args") - if args: - kwargs = {{k: v for k, v in kwargs.items() if v is not Undefined}} - - # Convert args to kwargs based on their types. - kwargs = _infer_encoding_types(args, kwargs) - # get a copy of the dict representation of the previous encoding - # ignore type as copy method comes from SchemaBase - copy = self.copy(deep=['encoding']) # type: ignore[attr-defined] - encoding = copy._get('encoding', {{}}) - if isinstance(encoding, core.VegaLiteSchema): - encoding = {{k: v for k, v in encoding._kwds.items() if v is not Undefined}} - # update with the new encodings, and apply them to the copy - encoding.update(kwargs) - copy.encoding = core.FacetedEncoding(**encoding) - return copy -''' - -ENCODE_TYPED_DICT: Final = ''' -class EncodeKwds(TypedDict, total=False): - """Encoding channels map properties of the data to visual properties of the chart. - {docstring}""" - {channels} - -''' - -# NOTE: Not yet reasonable to generalize `TypeAliasType`, `TypeVar` -# Revisit if this starts to become more common -TYPING_EXTRA: Final = ''' -T = TypeVar("T") -OneOrSeq = TypeAliasType("OneOrSeq", Union[T, Sequence[T]], type_params=(T,)) -"""One of ``T`` specified type(s), or a `Sequence` of such. - -Examples --------- -The parameters ``short``, ``long`` accept the same range of types:: - - # ruff: noqa: UP006, UP007 - - def func( - short: OneOrSeq[str | bool | float], - long: Union[str, bool, float, Sequence[Union[str, bool, float]], - ): ... -""" -''' - - -class SchemaGenerator(codegen.SchemaGenerator): - schema_class_template = textwrap.dedent( - ''' - class {classname}({basename}): - """{docstring}""" - _schema = {schema!r} - - {init_code} - ''' - ) - - @staticmethod - def _process_description(description: str) -> str: - return process_description(description) - - -def process_description(description: str) -> str: - # remove formatting from links - description = "".join( - [ - reSpecial.sub("", d) if i % 2 else d - for i, d in enumerate(reLink.split(description)) - ] - ) - description = rst_parse(description) - # Some entries in the Vega-Lite schema miss the second occurence of '__' - description = description.replace("__Default value: ", "__Default value:__ ") - # Fixing ambiguous unicode, RUF001 produces RUF002 in docs - description = description.replace("’", "'") # noqa: RUF001 [RIGHT SINGLE QUOTATION MARK] - description = description.replace("–", "-") # noqa: RUF001 [EN DASH] - description = description.replace(" ", " ") # noqa: RUF001 [NO-BREAK SPACE] - return description.strip() - - -class FieldSchemaGenerator(SchemaGenerator): - schema_class_template = textwrap.dedent( - ''' - @with_property_setters - class {classname}(FieldChannelMixin, core.{basename}): - """{docstring}""" - _class_is_valid_at_instantiation = False - _encoding_name = "{encodingname}" - - {method_code} - - {init_code} - ''' - ) - - -class ValueSchemaGenerator(SchemaGenerator): - schema_class_template = textwrap.dedent( - ''' - @with_property_setters - class {classname}(ValueChannelMixin, core.{basename}): - """{docstring}""" - _class_is_valid_at_instantiation = False - _encoding_name = "{encodingname}" - - {method_code} - - {init_code} - ''' - ) - - -class DatumSchemaGenerator(SchemaGenerator): - schema_class_template = textwrap.dedent( - ''' - @with_property_setters - class {classname}(DatumChannelMixin, core.{basename}): - """{docstring}""" - _class_is_valid_at_instantiation = False - _encoding_name = "{encodingname}" - - {method_code} - - {init_code} - ''' - ) - - -def schema_class(*args, **kwargs) -> str: - return SchemaGenerator(*args, **kwargs).schema_class() - - -def schema_url(version: str = SCHEMA_VERSION) -> str: - return SCHEMA_URL_TEMPLATE.format(library="vega-lite", version=version) - - -def download_schemafile( - version: str, schemapath: Path, skip_download: bool = False -) -> Path: - url = schema_url(version=version) - schemadir = Path(schemapath) - schemadir.mkdir(parents=True, exist_ok=True) - fp = schemadir / "vega-lite-schema.json" - if not skip_download: - request.urlretrieve(url, fp) - elif not fp.exists(): - msg = f"Cannot skip download: {fp!s} does not exist" - raise ValueError(msg) - return fp - - -def update_vega_themes(fp: Path, /, indent: str | int | None = 2) -> None: - themes = vlc.get_themes() - data = json.dumps(themes, indent=indent, sort_keys=True) - fp.write_text(data, encoding="utf8") - - theme_names = sorted(iter(themes)) - TypeAliasTracer.update_aliases(("VegaThemes", spell_literal(theme_names))) - - -def load_schema_with_shorthand_properties(schemapath: Path) -> dict: - with schemapath.open(encoding="utf8") as f: - schema = json.load(f) - - # At this point, schema is a python Dict - # Not sure what the below function does. It uses a lot of JSON logic - schema = _add_shorthand_property_to_field_encodings(schema) - return schema - - -def _add_shorthand_property_to_field_encodings(schema: dict) -> dict: - encoding_def = "FacetedEncoding" - - encoding = SchemaInfo(schema["definitions"][encoding_def], rootschema=schema) - - #print(yaml.dump(schema, default_flow_style=False)) - for _, propschema in encoding.properties.items(): - def_dict = get_field_datum_value_defs(propschema, schema) - - field_ref = def_dict.get("field") - if field_ref is not None: - defschema = {"$ref": field_ref} - defschema = copy.deepcopy(resolve_references(defschema, schema)) - # For Encoding field definitions, we patch the schema by adding the - # shorthand property. - defschema["properties"]["shorthand"] = { - "anyOf": [ - {"type": "string"}, - {"type": "array", "items": {"type": "string"}}, - {"$ref": "#/definitions/RepeatRef"}, - ], - "description": "shorthand for field, aggregate, and type", - } - if "required" not in defschema: - defschema["required"] = ["shorthand"] - elif "shorthand" not in defschema["required"]: - defschema["required"].append("shorthand") - schema["definitions"][field_ref.split("/")[-1]] = defschema - return schema - - -def copy_schemapi_util() -> None: - """Copy the schemapi utility into altair/utils/ and its test file to tests/utils/.""" - # copy the schemapi utility file - source_fp = Path(__file__).parent / "schemapi" / "schemapi.py" - destination_fp = Path(__file__).parent / ".." / "altair" / "utils" / "schemapi.py" - - print(f"Copying\n {source_fp!s}\n -> {destination_fp!s}") - with source_fp.open(encoding="utf8") as source, destination_fp.open( - "w", encoding="utf8" - ) as dest: - dest.write(HEADER) - dest.writelines(source.readlines()) - if sys.platform == "win32": - ruff_format_py(destination_fp) - - -def recursive_dict_update(schema: dict, root: dict, def_dict: dict) -> None: - if "$ref" in schema: - next_schema = resolve_references(schema, root) - if "properties" in next_schema: - definition = schema["$ref"] - properties = next_schema["properties"] - for k in def_dict: - if k in properties: - def_dict[k] = definition - else: - recursive_dict_update(next_schema, root, def_dict) - elif "anyOf" in schema: - for sub_schema in schema["anyOf"]: - recursive_dict_update(sub_schema, root, def_dict) - - -def get_field_datum_value_defs(propschema: SchemaInfo, root: dict) -> dict[str, str]: - def_dict: dict[str, str | None] = dict.fromkeys(("field", "datum", "value")) - schema = propschema.schema - if propschema.is_reference() and "properties" in schema: - if "field" in schema["properties"]: - def_dict["field"] = propschema.ref - else: - msg = "Unexpected schema structure" - raise ValueError(msg) - else: - recursive_dict_update(schema, root, def_dict) - - return {i: j for i, j in def_dict.items() if j} - - -def toposort(graph: dict[str, list[str]]) -> list[str]: - """ - Topological sort of a directed acyclic graph. - - Parameters - ---------- - graph : dict of lists - Mapping of node labels to list of child node labels. - This is assumed to represent a graph with no cycles. - - Returns - ------- - order : list - topological order of input graph. - """ - # Once we drop support for Python 3.8, this can potentially be replaced - # with graphlib.TopologicalSorter from the standard library. - stack: list[str] = [] - visited: dict[str, Literal[True]] = {} - - def visit(nodes): - for node in sorted(nodes, reverse=True): - if not visited.get(node): - visited[node] = True - visit(graph.get(node, [])) - stack.insert(0, node) - - visit(graph) - return stack - -### (X) Function to generate a schema wrapper for Vega-Lite. -def generate_vegalite_schema_wrapper(schema_file: Path) -> str: - """Generate a schema wrapper at the given path.""" - # TODO: generate simple tests for each wrapper - basename = "VegaLiteSchema" - - # Not sure what the below function does. It uses a lot of JSON logic - # I'm thinkking of it as just loading the schema - rootschema = load_schema_with_shorthand_properties(schema_file) - - definitions: dict[str, SchemaGenerator] = {} - - ### (X) Loop through the definitions in the rootschema and create a SchemaGenerator for each one. - # There is a schema generator object for every single lowest level key in the JSON object - for name in rootschema["definitions"]: - defschema = {"$ref": "#/definitions/" + name} - defschema_repr = {"$ref": "#/definitions/" + name} - name = get_valid_identifier(name) - definitions[name] = SchemaGenerator( - name, - schema=defschema, - schemarepr=defschema_repr, - rootschema=rootschema, - basename=basename, - rootschemarepr=CodeSnippet(f"{basename}._rootschema"), - ) - - #print(definitions) - #print("\n\n\n") - - ### (X) Create a DAG of the definitions. - # The DAG consists of each lowest level key corresponding to an array of each in-document $ref - # reference in a dictionary - graph: dict[str, list[str]] = {} - - for name, schema in definitions.items(): - graph[name] = [] - for child_name in schema.subclasses(): - child_name = get_valid_identifier(child_name) - graph[name].append(child_name) - child: SchemaGenerator = definitions[child_name] - if child.basename == basename: - child.basename = [name] - else: - assert isinstance(child.basename, list) - child.basename.append(name) - - #print(graph) - - # Specify __all__ explicitly so that we can exclude the ones from the list - # of exported classes which are also defined in the channels or api modules which takes - # precedent in the generated __init__.py files one and two levels up. - # Importing these classes from multiple modules confuses type checkers. - EXCLUDE = {"Color", "Text", "LookupData", "Dict", "FacetMapping"} - it = (c for c in definitions.keys() - EXCLUDE if not c.startswith("_")) - all_ = [*sorted(it), "Root", "VegaLiteSchema", "SchemaBase", "load_schema"] - - contents = [ - HEADER, - "from __future__ import annotations\n" - "from typing import Any, Literal, Union, Protocol, Sequence, List, Iterator, TYPE_CHECKING", - "import pkgutil", - "import json\n", - "from narwhals.dependencies import is_pandas_dataframe as _is_pandas_dataframe", - "from altair.utils.schemapi import SchemaBase, Undefined, UndefinedType, _subclasses # noqa: F401\n", - _type_checking_only_imports( - "from altair import Parameter", - "from altair.typing import Optional", - "from ._typing import * # noqa: F403", - ), - "\n" f"__all__ = {all_}\n", - LOAD_SCHEMA.format(schemafile="vega-lite-schema.json"), - BASE_SCHEMA.format(basename=basename), - schema_class( - "Root", - schema=rootschema, - basename=basename, - schemarepr=CodeSnippet(f"{basename}._rootschema"), - ), - ] - - ### (X) Append the schema classes in topological order to the contents. - # This sort puts the edges at the start of the reference chain first - for name in toposort(graph): - contents.append(definitions[name].schema_class()) - - contents.append("") # end with newline - return "\n".join(contents) - - -def _type_checking_only_imports(*imports: str) -> str: - return ( - "\n# ruff: noqa: F405\nif TYPE_CHECKING:\n" - + "\n".join(f" {s}" for s in imports) - + "\n" - ) - - -@dataclass -class ChannelInfo: - supports_arrays: bool - deep_description: str - field_class_name: str - datum_class_name: str | None = None - value_class_name: str | None = None - - @property - def is_field_only(self) -> bool: - return not (self.datum_class_name or self.value_class_name) - - @property - def all_names(self) -> Iterator[str]: - """All channels are expected to have a field class.""" - yield self.field_class_name - yield from self.non_field_names - - @property - def non_field_names(self) -> Iterator[str]: - if self.is_field_only: - yield from () - else: - if self.datum_class_name: - yield self.datum_class_name - if self.value_class_name: - yield self.value_class_name - - -def generate_vegalite_channel_wrappers( - schemafile: Path, version: str, imports: list[str] | None = None -) -> str: - schema = load_schema_with_shorthand_properties(schemafile) - - encoding_def = "FacetedEncoding" - - encoding = SchemaInfo(schema["definitions"][encoding_def], rootschema=schema) - - channel_infos: dict[str, ChannelInfo] = {} - - class_defs = [] - - for prop, propschema in encoding.properties.items(): - def_dict = get_field_datum_value_defs(propschema, schema) - - supports_arrays = any( - schema_info.is_array() for schema_info in propschema.anyOf - ) - classname: str = prop[0].upper() + prop[1:] - channel_info = ChannelInfo( - supports_arrays=supports_arrays, - deep_description=propschema.deep_description, - field_class_name=classname, - ) - - for encoding_spec, definition in def_dict.items(): - basename = definition.rsplit("/", maxsplit=1)[-1] - basename = get_valid_identifier(basename) - - gen: SchemaGenerator - defschema = {"$ref": definition} - kwds = { - "basename": basename, - "schema": defschema, - "rootschema": schema, - "encodingname": prop, - "haspropsetters": True, - } - if encoding_spec == "field": - gen = FieldSchemaGenerator(classname, nodefault=[], **kwds) - elif encoding_spec == "datum": - temp_name = f"{classname}Datum" - channel_info.datum_class_name = temp_name - gen = DatumSchemaGenerator(temp_name, nodefault=["datum"], **kwds) - elif encoding_spec == "value": - temp_name = f"{classname}Value" - channel_info.value_class_name = temp_name - gen = ValueSchemaGenerator(temp_name, nodefault=["value"], **kwds) - - class_defs.append(gen.schema_class()) - - channel_infos[prop] = channel_info - - # NOTE: See https://github.com/vega/altair/pull/3482#issuecomment-2241577342 - COMPAT_EXPORTS = ( - "DatumChannelMixin", - "FieldChannelMixin", - "ValueChannelMixin", - "with_property_setters", - ) - - it = chain.from_iterable(info.all_names for info in channel_infos.values()) - all_ = list(chain(it, COMPAT_EXPORTS)) - - imports = imports or [ - "from __future__ import annotations\n", - "from typing import Any, overload, Sequence, List, Literal, Union, TYPE_CHECKING, TypedDict", - "from typing_extensions import TypeAlias", - "import narwhals.stable.v1 as nw", - "from altair.utils.schemapi import Undefined, with_property_setters", - "from altair.utils import infer_encoding_types as _infer_encoding_types", - "from altair.utils import parse_shorthand", - "from . import core", - "from ._typing import * # noqa: F403", - ] - contents = [ - HEADER, - CHANNEL_MYPY_IGNORE_STATEMENTS, - *imports, - _type_checking_only_imports( - "from altair import Parameter, SchemaBase", - "from altair.typing import Optional", - "from typing_extensions import Self", - ), - "\n" f"__all__ = {sorted(all_)}\n", - CHANNEL_MIXINS, - *class_defs, - *generate_encoding_artifacts(channel_infos, ENCODE_METHOD, ENCODE_TYPED_DICT), - ] - return "\n".join(contents) - - -def generate_vegalite_mark_mixin( - schemafile: Path, markdefs: dict[str, str] -) -> tuple[list[str], str]: - with schemafile.open(encoding="utf8") as f: - schema = json.load(f) - - class_name = "MarkMethodMixin" - - imports = [ - "from typing import Any, Sequence, List, Literal, Union", - "", - "from altair.utils.schemapi import Undefined, UndefinedType", - "from . import core", - ] - - code = [ - f"class {class_name}:", - ' """A mixin class that defines mark methods"""', - ] - - for mark_enum, mark_def in markdefs.items(): - if "enum" in schema["definitions"][mark_enum]: - marks = schema["definitions"][mark_enum]["enum"] - else: - marks = [schema["definitions"][mark_enum]["const"]] - info = SchemaInfo({"$ref": f"#/definitions/{mark_def}"}, rootschema=schema) - - # adapted from SchemaInfo.init_code - arg_info = codegen.get_args(info) - arg_info.required -= {"type"} - arg_info.kwds -= {"type"} - - def_args = ["self"] + [ - f"{p}: " - + info.properties[p].get_python_type_representation( - for_type_hints=True, - additional_type_hints=["UndefinedType"], - ) - + " = Undefined" - for p in (sorted(arg_info.required) + sorted(arg_info.kwds)) - ] - dict_args = [ - f"{p}={p}" for p in (sorted(arg_info.required) + sorted(arg_info.kwds)) - ] - - if arg_info.additional or arg_info.invalid_kwds: - def_args.append("**kwds") - dict_args.append("**kwds") - - for mark in marks: - # TODO: only include args relevant to given type? - mark_method = MARK_METHOD.format( - mark=mark, - mark_def=mark_def, - def_arglist=", ".join(def_args), - dict_arglist=", ".join(dict_args), - ) - code.append("\n ".join(mark_method.splitlines())) - - return imports, "\n".join(code) - - -def generate_vegalite_config_mixin(schemafile: Path) -> tuple[list[str], str]: - imports = [ - "from . import core", - "from altair.utils import use_signature", - ] - - class_name = "ConfigMethodMixin" - - code = [ - f"class {class_name}:", - ' """A mixin class that defines config methods"""', - ] - with schemafile.open(encoding="utf8") as f: - schema = json.load(f) - info = SchemaInfo({"$ref": "#/definitions/Config"}, rootschema=schema) - - # configure() method - method = CONFIG_METHOD.format(classname="Config", method="configure") - code.append("\n ".join(method.splitlines())) - - # configure_prop() methods - for prop, prop_info in info.properties.items(): - classname = prop_info.refname - if classname and classname.endswith("Config"): - method = CONFIG_PROP_METHOD.format(classname=classname, prop=prop) - code.append("\n ".join(method.splitlines())) - return imports, "\n".join(code) - - -def vegalite_main(skip_download: bool = False) -> None: - version = SCHEMA_VERSION - ###(H) Below just gets the path to vegalite main file - vn = version.split(".")[0] - fp = (Path(__file__).parent / ".." / "altair" / "vegalite" / vn).resolve() - schemapath = fp / "schema" - ###(H) They download the schema, eg: altair/altair/vegalite/v5/schema/vega-lite-schema.json - schemafile = download_schemafile( - version=version, - schemapath=schemapath, - skip_download=skip_download, - ) - - fp_themes = schemapath / "vega-themes.json" - print(f"Updating themes\n {schemafile!s}\n ->{fp_themes!s}") - update_vega_themes(fp_themes) - - # Generate __init__.py file - outfile = schemapath / "__init__.py" - print(f"Writing {outfile!s}") - # The content is written word for word as seen - content = [ - "# ruff: noqa\n", - "from .core import *\nfrom .channels import *\n", - f"SCHEMA_VERSION = '{version}'\n", - f"SCHEMA_URL = {schema_url(version)!r}\n", - ] - ###(H)ruff is a python 'linter' written in Rust, which is essentially - ###syntax formatting and checking. - ###The function below is a combination of writing, ruff checking and formatting - ruff_write_lint_format_str(outfile, content) - - # TypeAliasTracer is imported from utils.py and keeps track of all aliases for literals - TypeAliasTracer.update_aliases(("Map", "Mapping[str, Any]")) - - ###(H) Note: Path is a type imported from pathlib. Every Path added to the files - ### dictionary is eventually written to and formatted using ruff - files: dict[Path, str | Iterable[str]] = {} - - # Generate the core schema wrappers - fp_core = schemapath / "core.py" - print(f"Generating\n {schemafile!s}\n ->{fp_core!s}") - # Reminder: the schemafile here is the downloaded reference schemafile - files[fp_core] = generate_vegalite_schema_wrapper(schemafile) - - # Generate the channel wrappers - fp_channels = schemapath / "channels.py" - print(f"Generating\n {schemafile!s}\n ->{fp_channels!s}") - files[fp_channels] = generate_vegalite_channel_wrappers(schemafile, version=version) - - # generate the mark mixin - # A mixin class is one which provides functionality to other classes as a standalone class - markdefs = {k: f"{k}Def" for k in ["Mark", "BoxPlot", "ErrorBar", "ErrorBand"]} - fp_mixins = schemapath / "mixins.py" - print(f"Generating\n {schemafile!s}\n ->{fp_mixins!s}") - - # The following function dynamically creates a mixin class that can be used for 'marks' (eg. bars on bar chart, dot on scatter) - mark_imports, mark_mixin = generate_vegalite_mark_mixin(schemafile, markdefs) - config_imports, config_mixin = generate_vegalite_config_mixin(schemafile) - try_except_imports = [ - "if sys.version_info >= (3, 11):", - " from typing import Self", - "else:", - " from typing_extensions import Self", - ] - stdlib_imports = ["from __future__ import annotations\n", "import sys"] - content_mixins = [ - HEADER, - "\n".join(stdlib_imports), - "\n\n", - "\n".join(sorted({*mark_imports, *config_imports})), - "\n\n", - "\n".join(try_except_imports), - "\n\n", - _type_checking_only_imports( - "from altair import Parameter, SchemaBase", - "from altair.typing import Optional", - "from ._typing import * # noqa: F403", - ), - "\n\n\n", - mark_mixin, - "\n\n\n", - config_mixin, - ] - files[fp_mixins] = content_mixins - - # Write `_typing.py` TypeAlias, for import in generated modules - fp_typing = schemapath / "_typing.py" - msg = ( - f"Generating\n {schemafile!s}\n ->{fp_typing!s}\n" - f"Tracer cache collected {TypeAliasTracer.n_entries!r} entries." - ) - print(msg) - TypeAliasTracer.write_module( - fp_typing, "OneOrSeq", header=HEADER, extra=TYPING_EXTRA - ) - # Write the pre-generated modules - for fp, contents in files.items(): - print(f"Writing\n {schemafile!s}\n ->{fp!s}") - ruff_write_lint_format_str(fp, contents) - - -def generate_encoding_artifacts( - channel_infos: dict[str, ChannelInfo], fmt_method: str, fmt_typed_dict: str -) -> Iterator[str]: - """ - Generate ``Chart.encode()`` and related typing structures. - - - `TypeAlias`(s) for each parameter to ``Chart.encode()`` - - Mixin class that provides the ``Chart.encode()`` method - - `TypedDict`, utilising/describing these structures as part of https://github.com/pola-rs/polars/pull/17995. - - Notes - ----- - - `Map`/`Dict` stands for the return types of `alt.(datum|value)`, and any encoding channel class. - - See discussions in https://github.com/vega/altair/pull/3208 - - We could be more specific about what types are accepted in the `List` - - but this translates poorly to an IDE - - `info.supports_arrays` - """ - signature_args: list[str] = ["self", "*args: Any"] - type_aliases: list[str] = [] - typed_dict_args: list[str] = [] - signature_doc_params: list[str] = ["", "Parameters", "----------"] - typed_dict_doc_params: list[str] = ["", "Parameters", "----------"] - - for channel, info in channel_infos.items(): - alias_name: str = f"Channel{channel[0].upper()}{channel[1:]}" - - it: Iterator[str] = info.all_names - it_rst_names: Iterator[str] = (rst_syntax_for_class(c) for c in info.all_names) - - docstring_types: list[str] = ["str", next(it_rst_names), "Dict"] - tp_inner: str = ", ".join(chain(("str", next(it), "Map"), it)) - tp_inner = f"Union[{tp_inner}]" - - if info.supports_arrays: - docstring_types.append("List") - tp_inner = f"OneOrSeq[{tp_inner}]" - - doc_types_flat: str = ", ".join(chain(docstring_types, it_rst_names)) - - type_aliases.append(f"{alias_name}: TypeAlias = {tp_inner}") - # We use the full type hints instead of the alias in the signatures below - # as IDEs such as VS Code would else show the name of the alias instead - # of the expanded full type hints. The later are more useful to users. - typed_dict_args.append(f"{channel}: {tp_inner}") - signature_args.append(f"{channel}: Optional[{tp_inner}] = Undefined") - - description: str = f" {process_description(info.deep_description)}" - - signature_doc_params.extend((f"{channel} : {doc_types_flat}", description)) - typed_dict_doc_params.extend((f"{channel}", description)) - - method: str = fmt_method.format( - method_args=", ".join(signature_args), - docstring=indent_docstring(signature_doc_params, indent_level=8, lstrip=False), - ) - typed_dict: str = fmt_typed_dict.format( - channels="\n ".join(typed_dict_args), - docstring=indent_docstring(typed_dict_doc_params, indent_level=4, lstrip=False), - ) - artifacts: Iterable[str] = *type_aliases, method, typed_dict - yield from artifacts - - -def main() -> None: - parser = argparse.ArgumentParser( - prog="generate_schema_wrapper.py", description="Generate the Altair package." - ) - parser.add_argument( - "--skip-download", action="store_true", help="skip downloading schema files" - ) - ###(H) I've used this library before. The below just does the actual arg parsing - args = parser.parse_args() - ###(H) Copies the schemapi.py file from schemapi to ../altair/utils - copy_schemapi_util() - - vegalite_main(args.skip_download) - - # The modules below are imported after the generation of the new schema files - # as these modules import Altair. This allows them to use the new changes - from tools import generate_api_docs, update_init_file - - generate_api_docs.write_api_file() - update_init_file.update__all__variable() - - -if __name__ == "__main__": - main() diff --git a/tools/generated_classes.py b/tools/generated_classes.py deleted file mode 100644 index 437d65d4..00000000 --- a/tools/generated_classes.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import Any, Union -class AggregateExpression: - def __init__(self, agg: str, label: str = None): - self.agg = agg - self.label = label - -class ParamRef: - def __init__(self): - pass - -class TransformField: - def __init__(self, value: Union["str", "ParamRef"]): - self.value = value - -class AggregateTransform: - def __init__(self, value: Union["Argmax", "Argmin", "Avg", "Count", "Max", "Min", "First", "Last", "Median", "Mode", "Product", "Quantile", "Stddev", "StddevPop", "Sum", "Variance", "VarPop"]): - self.value = value - -class Argmax: - def __init__(self, argmax: Any, distinct: bool = None, orderby: Union[TransformField, Any] = None, partitionby: Union[TransformField, Any] = None, range: Union[Any, ParamRef] = None, rows: Union[Any, ParamRef] = None): - self.argmax = argmax - self.distinct = distinct - self.orderby = orderby - self.partitionby = partitionby - self.range = range - self.rows = rows - -class Argmin: - def __init__(self, argmin: Any, distinct: bool = None, orderby: Union[TransformField, Any] = None, partitionby: Union[TransformField, Any] = None, range: Union[Any, ParamRef] = None, rows: Union[Any, ParamRef] = None): - self.argmin = argmin - self.distinct = distinct - self.orderby = orderby - self.partitionby = partitionby - self.range = range - self.rows = rows - - diff --git a/tools/pyproject.toml b/tools/pyproject.toml deleted file mode 100644 index 81e724c2..00000000 --- a/tools/pyproject.toml +++ /dev/null @@ -1,22 +0,0 @@ -[build-system] -requires = ["setuptools>=61.0"] -build-backend = "setuptools.build_meta" - -[project] -name = "schema-wrapper" -version = "0.1.0" -description = "A tool to generate schema wrapper classes" -readme = "README.md" -requires-python = ">=3.7" -classifiers = [ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: MIT License", - "Operating System :: OS Independent", -] - -[project.scripts] -generate-schema-wrapper = "schema_wrapper.generate_schema_wrapper:main" - -[tool.setuptools.packages.find] -where = ["."] -include = ["schema_wrapper*"] diff --git a/tools/schema_wrapper.egg-info/PKG-INFO b/tools/schema_wrapper.egg-info/PKG-INFO deleted file mode 100644 index a28858a1..00000000 --- a/tools/schema_wrapper.egg-info/PKG-INFO +++ /dev/null @@ -1,9 +0,0 @@ -Metadata-Version: 2.1 -Name: schema-wrapper -Version: 0.1.0 -Summary: A tool to generate schema wrapper classes -Classifier: Programming Language :: Python :: 3 -Classifier: License :: OSI Approved :: MIT License -Classifier: Operating System :: OS Independent -Requires-Python: >=3.7 -Description-Content-Type: text/markdown diff --git a/tools/schema_wrapper.egg-info/SOURCES.txt b/tools/schema_wrapper.egg-info/SOURCES.txt deleted file mode 100644 index 57d8ad8b..00000000 --- a/tools/schema_wrapper.egg-info/SOURCES.txt +++ /dev/null @@ -1,9 +0,0 @@ -pyproject.toml -schema_wrapper/__init__.py -schema_wrapper/generate_schema_wrapper.py -schema_wrapper/utils.py -schema_wrapper.egg-info/PKG-INFO -schema_wrapper.egg-info/SOURCES.txt -schema_wrapper.egg-info/dependency_links.txt -schema_wrapper.egg-info/entry_points.txt -schema_wrapper.egg-info/top_level.txt \ No newline at end of file diff --git a/tools/schema_wrapper.egg-info/dependency_links.txt b/tools/schema_wrapper.egg-info/dependency_links.txt deleted file mode 100644 index 8b137891..00000000 --- a/tools/schema_wrapper.egg-info/dependency_links.txt +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tools/schema_wrapper.egg-info/entry_points.txt b/tools/schema_wrapper.egg-info/entry_points.txt deleted file mode 100644 index 8895e490..00000000 --- a/tools/schema_wrapper.egg-info/entry_points.txt +++ /dev/null @@ -1,2 +0,0 @@ -[console_scripts] -generate-schema-wrapper = schema_wrapper.generate_schema_wrapper:main diff --git a/tools/schema_wrapper.egg-info/top_level.txt b/tools/schema_wrapper.egg-info/top_level.txt deleted file mode 100644 index 7f3da045..00000000 --- a/tools/schema_wrapper.egg-info/top_level.txt +++ /dev/null @@ -1 +0,0 @@ -schema_wrapper diff --git a/tools/schemapi/__init__.py b/tools/schemapi/__init__.py deleted file mode 100755 index 023a9a2a..00000000 --- a/tools/schemapi/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -"""schemapi: tools for generating Python APIs from JSON schemas.""" - -from tools.schemapi import codegen, utils -from tools.schemapi.codegen import CodeSnippet -from tools.schemapi.schemapi import SchemaBase, Undefined -from tools.schemapi.utils import SchemaInfo - -__all__ = ["CodeSnippet", "SchemaBase", "SchemaInfo", "Undefined", "codegen", "utils"] diff --git a/tools/schemapi/__pycache__/__init__.cpython-311.pyc b/tools/schemapi/__pycache__/__init__.cpython-311.pyc deleted file mode 100644 index 244c9bd4..00000000 Binary files a/tools/schemapi/__pycache__/__init__.cpython-311.pyc and /dev/null differ diff --git a/tools/schemapi/__pycache__/codegen.cpython-311.pyc b/tools/schemapi/__pycache__/codegen.cpython-311.pyc deleted file mode 100644 index 4f967470..00000000 Binary files a/tools/schemapi/__pycache__/codegen.cpython-311.pyc and /dev/null differ diff --git a/tools/schemapi/__pycache__/schemapi.cpython-311.pyc b/tools/schemapi/__pycache__/schemapi.cpython-311.pyc deleted file mode 100644 index 110df5c6..00000000 Binary files a/tools/schemapi/__pycache__/schemapi.cpython-311.pyc and /dev/null differ diff --git a/tools/schemapi/__pycache__/schemapi.cpython-39.pyc b/tools/schemapi/__pycache__/schemapi.cpython-39.pyc deleted file mode 100644 index 4579e7d8..00000000 Binary files a/tools/schemapi/__pycache__/schemapi.cpython-39.pyc and /dev/null differ diff --git a/tools/schemapi/__pycache__/utils.cpython-311.pyc b/tools/schemapi/__pycache__/utils.cpython-311.pyc deleted file mode 100644 index 7f61e2d0..00000000 Binary files a/tools/schemapi/__pycache__/utils.cpython-311.pyc and /dev/null differ diff --git a/tools/schemapi/codegen.py b/tools/schemapi/codegen.py deleted file mode 100755 index cf8ea81b..00000000 --- a/tools/schemapi/codegen.py +++ /dev/null @@ -1,380 +0,0 @@ -"""Code generation utilities.""" - -from __future__ import annotations - -import re -import textwrap -from dataclasses import dataclass -from typing import Final - -from .utils import ( - SchemaInfo, - TypeAliasTracer, - flatten, - indent_docstring, - is_valid_identifier, - jsonschema_to_python_types, - spell_literal, -) - - -class CodeSnippet: - """Object whose repr() is a string of code.""" - - def __init__(self, code: str): - self.code = code - - def __repr__(self) -> str: - return self.code - - -@dataclass -class ArgInfo: - nonkeyword: bool - required: set[str] - kwds: set[str] - invalid_kwds: set[str] - additional: bool - - -def get_args(info: SchemaInfo) -> ArgInfo: - """Return the list of args & kwds for building the __init__ function.""" - # TODO: - set additional properties correctly - # - handle patternProperties etc. - required: set[str] = set() - kwds: set[str] = set() - invalid_kwds: set[str] = set() - - # TODO: specialize for anyOf/oneOf? - - if info.is_allOf(): - # recursively call function on all children - arginfo = [get_args(child) for child in info.allOf] - nonkeyword = all(args.nonkeyword for args in arginfo) - required = set.union(set(), *(args.required for args in arginfo)) - kwds = set.union(set(), *(args.kwds for args in arginfo)) - kwds -= required - invalid_kwds = set.union(set(), *(args.invalid_kwds for args in arginfo)) - additional = all(args.additional for args in arginfo) - elif info.is_empty() or info.is_compound(): - nonkeyword = True - additional = True - elif info.is_value(): - nonkeyword = True - additional = False - elif info.is_object(): - invalid_kwds = {p for p in info.required if not is_valid_identifier(p)} | { - p for p in info.properties if not is_valid_identifier(p) - } - required = {p for p in info.required if is_valid_identifier(p)} - kwds = {p for p in info.properties if is_valid_identifier(p)} - kwds -= required - nonkeyword = False - additional = True - # additional = info.additionalProperties or info.patternProperties - else: - msg = "Schema object not understood" - raise ValueError(msg) - - return ArgInfo( - nonkeyword=nonkeyword, - required=required, - kwds=kwds, - invalid_kwds=invalid_kwds, - additional=additional, - ) - - -class SchemaGenerator: - """ - Class that defines methods for generating code from schemas. - - Parameters - ---------- - classname : string - The name of the class to generate - schema : dict - The dictionary defining the schema class - rootschema : dict (optional) - The root schema for the class - basename : string or list of strings (default: "SchemaBase") - The name(s) of the base class(es) to use in the class definition - schemarepr : CodeSnippet or object, optional - An object whose repr will be used in the place of the explicit schema. - This can be useful, for example, when the generated code should reference - a predefined schema object. The user must ensure that the schema within - the evaluated code is identical to the schema used to generate the code. - rootschemarepr : CodeSnippet or object, optional - An object whose repr will be used in the place of the explicit root - schema. - **kwargs : dict - Additional keywords for derived classes. - """ - - schema_class_template = textwrap.dedent( - ''' - class {classname}({basename}): - """{docstring}""" - _schema = {schema!r} - _rootschema = {rootschema!r} - - {init_code} - ''' - ) - - init_template: Final = textwrap.dedent( - """ - def __init__({arglist}): - super({classname}, self).__init__({super_arglist}) - """ - ).lstrip() - - def _process_description(self, description: str): - return description - - def __init__( - self, - classname: str, - schema: dict, - rootschema: dict | None = None, - basename: str | list[str] = "SchemaBase", - schemarepr: object | None = None, - rootschemarepr: object | None = None, - nodefault: list[str] | None = None, - haspropsetters: bool = False, - **kwargs, - ) -> None: - self.classname = classname - self.schema = schema - self.rootschema = rootschema - self.basename = basename - self.schemarepr = schemarepr - self.rootschemarepr = rootschemarepr - self.nodefault = nodefault or () - self.haspropsetters = haspropsetters - self.kwargs = kwargs - - def subclasses(self) -> list[str]: - """Return a list of subclass names, if any.""" - info = SchemaInfo(self.schema, self.rootschema) - return [child.refname for child in info.anyOf if child.is_reference()] - - def schema_class(self) -> str: - """Generate code for a schema class.""" - rootschema: dict = ( - self.rootschema if self.rootschema is not None else self.schema - ) - schemarepr: object = ( - self.schemarepr if self.schemarepr is not None else self.schema - ) - rootschemarepr = self.rootschemarepr - if rootschemarepr is None: - if rootschema is self.schema: - rootschemarepr = CodeSnippet("_schema") - else: - rootschemarepr = rootschema - if isinstance(self.basename, str): - basename = self.basename - else: - basename = ", ".join(self.basename) - return self.schema_class_template.format( - classname=self.classname, - basename=basename, - schema=schemarepr, - rootschema=rootschemarepr, - docstring=self.docstring(indent=4), - init_code=self.init_code(indent=4), - method_code=self.method_code(indent=4), - **self.kwargs, - ) - - @property - def info(self) -> SchemaInfo: - return SchemaInfo(self.schema, self.rootschema) - - @property - def arg_info(self) -> ArgInfo: - return get_args(self.info) - - def docstring(self, indent: int = 0) -> str: - info = self.info - # https://numpydoc.readthedocs.io/en/latest/format.html#short-summary - doc = [f"{self.classname} schema wrapper"] - if info.description: - # https://numpydoc.readthedocs.io/en/latest/format.html#extended-summary - # Remove condition from description - desc: str = re.sub(r"\n\{\n(\n|.)*\n\}", "", info.description) - ext_summary: list[str] = self._process_description(desc).splitlines() - # Remove lines which contain the "raw-html" directive which cannot be processed - # by Sphinx at this level of the docstring. It works for descriptions - # of attributes which is why we do not do the same below. The removed - # lines are anyway non-descriptive for a user. - ext_summary = [line for line in ext_summary if ":raw-html:" not in line] - # Only add an extended summary if the above did not result in an empty list. - if ext_summary: - doc.append("") - doc.extend(ext_summary) - - if info.properties: - arg_info = self.arg_info - doc += ["", "Parameters", "----------", ""] - for prop in ( - sorted(arg_info.required) - + sorted(arg_info.kwds) - + sorted(arg_info.invalid_kwds) - ): - propinfo = info.properties[prop] - doc += [ - f"{prop} : {propinfo.get_python_type_representation()}", - f" {self._process_description(propinfo.deep_description)}", - ] - return indent_docstring(doc, indent_level=indent, width=100, lstrip=True) - - def init_code(self, indent: int = 0) -> str: - """Return code suitable for the __init__ function of a Schema class.""" - args, super_args = self.init_args() - - initfunc = self.init_template.format( - classname=self.classname, - arglist=", ".join(args), - super_arglist=", ".join(super_args), - ) - if indent: - initfunc = ("\n" + indent * " ").join(initfunc.splitlines()) - return initfunc - - def init_args( - self, additional_types: list[str] | None = None - ) -> tuple[list[str], list[str]]: - additional_types = additional_types or [] - info = self.info - arg_info = self.arg_info - - nodefault = set(self.nodefault) - arg_info.required -= nodefault - arg_info.kwds -= nodefault - - args: list[str] = ["self"] - super_args: list[str] = [] - - self.init_kwds = sorted(arg_info.kwds) - - if nodefault: - args.extend(sorted(nodefault)) - elif arg_info.nonkeyword: - args.append("*args") - super_args.append("*args") - - args.extend( - f"{p}: Optional[Union[" - + ", ".join( - [ - *additional_types, - *info.properties[p].get_python_type_representation( - for_type_hints=True, return_as_str=False - ), - ] - ) - + "]] = Undefined" - for p in sorted(arg_info.required) + sorted(arg_info.kwds) - ) - super_args.extend( - f"{p}={p}" - for p in sorted(nodefault) - + sorted(arg_info.required) - + sorted(arg_info.kwds) - ) - - if arg_info.additional: - args.append("**kwds") - super_args.append("**kwds") - return args, super_args - - def get_args(self, si: SchemaInfo) -> list[str]: - contents = ["self"] - prop_infos: dict[str, SchemaInfo] = {} - if si.is_anyOf(): - prop_infos = {} - for si_sub in si.anyOf: - prop_infos.update(si_sub.properties) - elif si.properties: - prop_infos = dict(si.properties.items()) - - if prop_infos: - contents.extend( - [ - f"{p}: " - + info.get_python_type_representation( - for_type_hints=True, additional_type_hints=["UndefinedType"] - ) - + " = Undefined" - for p, info in prop_infos.items() - ] - ) - elif si.type: - py_type = jsonschema_to_python_types[si.type] - if py_type == "list": - # Try to get a type hint like "List[str]" which is more specific - # then just "list" - item_vl_type = si.items.get("type", None) - if item_vl_type is not None: - item_type = jsonschema_to_python_types[item_vl_type] - else: - item_si = SchemaInfo(si.items, self.rootschema) - assert item_si.is_reference() - altair_class_name = item_si.title - item_type = f"core.{altair_class_name}" - py_type = f"List[{item_type}]" - elif si.is_literal(): - # If it's an enum, we can type hint it as a Literal which tells - # a type checker that only the values in enum are acceptable - py_type = TypeAliasTracer.add_literal( - si, spell_literal(si.literal), replace=True - ) - contents.append(f"_: {py_type}") - - contents.append("**kwds") - - return contents - - def get_signature( - self, attr: str, sub_si: SchemaInfo, indent: int, has_overload: bool = False - ) -> list[str]: - lines = [] - if has_overload: - lines.append("@overload") - args = ", ".join(self.get_args(sub_si)) - lines.extend( - (f"def {attr}({args}) -> '{self.classname}':", indent * " " + "...\n") - ) - return lines - - def setter_hint(self, attr: str, indent: int) -> list[str]: - si = SchemaInfo(self.schema, self.rootschema).properties[attr] - if si.is_anyOf(): - return self._get_signature_any_of(si, attr, indent) - else: - return self.get_signature(attr, si, indent, has_overload=True) - - def _get_signature_any_of( - self, si: SchemaInfo, attr: str, indent: int - ) -> list[str]: - signatures = [] - for sub_si in si.anyOf: - if sub_si.is_anyOf(): - # Recursively call method again to go a level deeper - signatures.extend(self._get_signature_any_of(sub_si, attr, indent)) - else: - signatures.extend( - self.get_signature(attr, sub_si, indent, has_overload=True) - ) - return list(flatten(signatures)) - - def method_code(self, indent: int = 0) -> str | None: - """Return code to assist setter methods.""" - if not self.haspropsetters: - return None - args = self.init_kwds - type_hints = [hint for a in args for hint in self.setter_hint(a, indent)] - - return ("\n" + indent * " ").join(type_hints) diff --git a/tools/schemapi/schemapi.py b/tools/schemapi/schemapi.py deleted file mode 100644 index e69de29b..00000000 diff --git a/tools/schemapi/utils.py b/tools/schemapi/utils.py deleted file mode 100755 index 3fa30492..00000000 --- a/tools/schemapi/utils.py +++ /dev/null @@ -1,902 +0,0 @@ -"""Utilities for working with schemas.""" - -from __future__ import annotations - -import keyword -import re -import subprocess -import textwrap -import urllib -from html import unescape -from itertools import chain -from operator import itemgetter -from typing import ( - TYPE_CHECKING, - Any, - Final, - Iterable, - Iterator, - Literal, - Sequence, - overload, -) - -import mistune -from mistune.renderers.rst import RSTRenderer as _RSTRenderer - -from tools.schemapi.schemapi import _resolve_references as resolve_references - -if TYPE_CHECKING: - from pathlib import Path - from typing_extensions import LiteralString - - from mistune import BlockState - -EXCLUDE_KEYS: Final = ("definitions", "title", "description", "$schema", "id") - -jsonschema_to_python_types = { - "string": "str", - "number": "float", - "integer": "int", - "object": "Map", - "boolean": "bool", - "array": "list", - "null": "None", -} - - -class _TypeAliasTracer: - """ - Recording all `enum` -> `Literal` translations. - - Rewrites as `TypeAlias` to be reused anywhere, and not clog up method definitions. - - Parameters - ---------- - fmt - A format specifier to produce the `TypeAlias` name. - - Will be provided a `SchemaInfo.title` as a single positional argument. - *ruff_check - Optional [ruff rule codes](https://docs.astral.sh/ruff/rules/), - each prefixed with `--select ` and follow a `ruff check --fix ` call. - - If not provided, uses `[tool.ruff.lint.select]` from `pyproject.toml`. - ruff_format - Optional argument list supplied to [ruff format](https://docs.astral.sh/ruff/formatter/#ruff-format) - - Attributes - ---------- - _literals: dict[str, str] - `{alias_name: literal_statement}` - _literals_invert: dict[str, str] - `{literal_statement: alias_name}` - aliases: list[tuple[str, str]] - `_literals` sorted by `alias_name` - _imports: Sequence[str] - Prefined import statements to appear at beginning of module. - """ - - def __init__( - self, - fmt: str = "{}_T", - *ruff_check: str, - ruff_format: Sequence[str] | None = None, - ) -> None: - self.fmt: str = fmt - self._literals: dict[str, str] = {} - self._literals_invert: dict[str, str] = {} - self._aliases: dict[str, str] = {} - self._imports: Sequence[str] = ( - "from __future__ import annotations\n", - "from typing import Any, Literal, Mapping, TypeVar, Sequence, Union", - "from typing_extensions import TypeAlias, TypeAliasType", - ) - self._cmd_check: list[str] = ["--fix"] - self._cmd_format: Sequence[str] = ruff_format or () - for c in ruff_check: - self._cmd_check.extend(("--extend-select", c)) - - def _update_literals(self, name: str, tp: str, /) -> None: - """Produces an inverted index, to reuse a `Literal` when `SchemaInfo.title` is empty.""" - self._literals[name] = tp - self._literals_invert[tp] = name - - def add_literal( - self, info: SchemaInfo, tp: str, /, *, replace: bool = False - ) -> str: - """ - `replace=True` returns the eventual alias name. - - - Doing so will mean that the `_typing` module must be written first, before the source of `info`. - - Otherwise, `ruff` will raise an error during `check`/`format`, as the import will be invalid. - - Where a `title` is not found, an attempt will be made to find an existing alias definition that had one. - """ - if info.title: - alias = self.fmt.format(info.title) - if alias not in self._literals: - self._update_literals(alias, tp) - if replace: - tp = alias - elif (alias := self._literals_invert.get(tp)) and replace: - tp = alias - elif replace and info.is_union_literal(): - # Handles one very specific edge case `WindowFieldDef` - # - Has an anonymous enum union - # - One of the members is declared afterwards - # - SchemaBase needs to be first, as the union wont be internally sorted - it = ( - self.add_literal(el, spell_literal(el.literal), replace=True) - for el in info.anyOf - ) - tp = f"Union[SchemaBase, {', '.join(it)}]" - return tp - - def update_aliases(self, *name_statement: tuple[str, str]) -> None: - """ - Adds `(name, statement)` pairs to the definitions. - - These types should support annotations in generated code, but - are not required to be derived from the schema itself. - - Each tuple will appear in the generated module as:: - - name: TypeAlias = statement - - All aliases will be written in runtime-scope, therefore - externally dependent types should be declared as regular imports. - """ - self._aliases.update(name_statement) - - def generate_aliases(self) -> Iterator[str]: - """Represents a line per `TypeAlias` declaration.""" - for name, statement in self._aliases.items(): - yield f"{name}: TypeAlias = {statement}" - - def is_cached(self, tp: str, /) -> bool: - """ - Applies to both docstring and type hints. - - Currently used as a sort key, to place literals/aliases last. - """ - return tp in self._literals_invert or tp in self._literals or tp in self._aliases # fmt: skip - - def write_module( - self, fp: Path, *extra_all: str, header: LiteralString, extra: LiteralString - ) -> None: - """ - Write all collected `TypeAlias`'s to `fp`. - - Parameters - ---------- - fp - Path to new module. - *extra_all - Any manually spelled types to be exported. - header - `tools.generate_schema_wrapper.HEADER`. - extra - `tools.generate_schema_wrapper.TYPING_EXTRA`. - """ - ruff_format = ["ruff", "format", fp] - if self._cmd_format: - ruff_format.extend(self._cmd_format) - commands = (["ruff", "check", fp, *self._cmd_check], ruff_format) - static = (header, "\n", *self._imports, "\n\n") - self.update_aliases(*sorted(self._literals.items(), key=itemgetter(0))) - all_ = [*iter(self._aliases), *extra_all] - it = chain( - static, - [f"__all__ = {all_}", "\n\n", extra], - self.generate_aliases(), - ) - fp.write_text("\n".join(it), encoding="utf-8") - for cmd in commands: - r = subprocess.run(cmd, check=True) - r.check_returncode() - - @property - def n_entries(self) -> int: - """Number of unique `TypeAlias` defintions collected.""" - return len(self._literals) - - -TypeAliasTracer: _TypeAliasTracer = _TypeAliasTracer("{}_T", "I001", "I002") -"""An instance of `_TypeAliasTracer`. - -Collects a cache of unique `Literal` types used globally. - -These are then converted to `TypeAlias` statements, written to another module. - -Allows for a single definition to be reused multiple times, -rather than repeating long literals in every method definition. -""" - - -def get_valid_identifier( - prop: str, - replacement_character: str = "", - allow_unicode: bool = False, - url_decode: bool = True, -) -> str: - """ - Given a string property, generate a valid Python identifier. - - Parameters - ---------- - prop: string - Name of property to decode. - replacement_character: string, default '' - The character to replace invalid characters with. - allow_unicode: boolean, default False - If True, then allow Python 3-style unicode identifiers. - url_decode: boolean, default True - If True, decode URL characters in identifier names. - - Examples - -------- - >>> get_valid_identifier("my-var") - 'myvar' - - >>> get_valid_identifier("if") - 'if_' - - >>> get_valid_identifier("$schema", "_") - '_schema' - - >>> get_valid_identifier("$*#$") - '_' - - >>> get_valid_identifier("Name%3Cstring%3E") - 'Namestring' - """ - # Decode URL characters. - if url_decode: - prop = urllib.parse.unquote(prop) - - # Deal with [] - prop = prop.replace("[]", "Array") - - # First substitute-out all non-valid characters. - flags = re.UNICODE if allow_unicode else re.ASCII - valid = re.sub(r"\W", replacement_character, prop, flags=flags) - - # If nothing is left, use just an underscore - if not valid: - valid = "_" - - # first character must be a non-digit. Prefix with an underscore - # if needed - if re.match(r"^[\d\W]", valid): - valid = "_" + valid - - # if the result is a reserved keyword, then add an underscore at the end - if keyword.iskeyword(valid): - valid += "_" - return valid - - -def is_valid_identifier(var: str, allow_unicode: bool = False): - """ - Return true if var contains a valid Python identifier. - - Parameters - ---------- - val : string - identifier to check - allow_unicode : bool (default: False) - if True, then allow Python 3 style unicode identifiers. - """ - flags = re.UNICODE if allow_unicode else re.ASCII - is_valid = re.match(r"^[^\d\W]\w*\Z", var, flags) - return is_valid and not keyword.iskeyword(var) - - -class SchemaProperties: - """A wrapper for properties within a schema.""" - - def __init__( - self, - properties: dict[str, Any], - schema: dict, - rootschema: dict | None = None, - ) -> None: - self._properties = properties - self._schema = schema - self._rootschema = rootschema or schema - - def __bool__(self) -> bool: - return bool(self._properties) - - def __dir__(self) -> list[str]: - return list(self._properties.keys()) - - def __getattr__(self, attr): - try: - return self[attr] - except KeyError: - return super().__getattr__(attr) - - def __getitem__(self, attr): - dct = self._properties[attr] - if "definitions" in self._schema and "definitions" not in dct: - dct = dict(definitions=self._schema["definitions"], **dct) - return SchemaInfo(dct, self._rootschema) - - def __iter__(self): - return iter(self._properties) - - def items(self): - return ((key, self[key]) for key in self) - - def keys(self): - return self._properties.keys() - - def values(self): - return (self[key] for key in self) - - -class SchemaInfo: - """A wrapper for inspecting a JSON schema.""" - - def __init__( - self, schema: dict[str, Any], rootschema: dict[str, Any] | None = None - ) -> None: - if not rootschema: - rootschema = schema - self.raw_schema = schema - self.rootschema = rootschema - self.schema = resolve_references(schema, rootschema) - - def child(self, schema: dict) -> SchemaInfo: - return self.__class__(schema, rootschema=self.rootschema) - - def __repr__(self) -> str: - keys = [] - for key in sorted(self.schema.keys()): - val = self.schema[key] - rval = repr(val).replace("\n", "") - if len(rval) > 30: - rval = rval[:30] + "..." - if key == "definitions": - rval = "{...}" - elif key == "properties": - rval = "{\n " + "\n ".join(sorted(map(repr, val))) + "\n }" - keys.append(f'"{key}": {rval}') - return "SchemaInfo({\n " + "\n ".join(keys) + "\n})" - - @property - def title(self) -> str: - if self.is_reference(): - return get_valid_identifier(self.refname) - else: - return "" - - @overload - def get_python_type_representation( - self, - for_type_hints: bool = ..., - return_as_str: Literal[True] = ..., - additional_type_hints: list[str] | None = ..., - ) -> str: ... - @overload - def get_python_type_representation( - self, - for_type_hints: bool = ..., - return_as_str: Literal[False] = ..., - additional_type_hints: list[str] | None = ..., - ) -> list[str]: ... - def get_python_type_representation( # noqa: C901 - self, - for_type_hints: bool = False, - return_as_str: bool = True, - additional_type_hints: list[str] | None = None, - ) -> str | list[str]: - type_representations: list[str] = [] - """ - All types which can be used for the current `SchemaInfo`. - Including `altair` classes, standard `python` types, etc. - """ - - if self.title: - if for_type_hints: - # To keep type hints simple, we only use the SchemaBase class - # as the type hint for all classes which inherit from it. - class_names = ["SchemaBase"] - if self.title in {"ExprRef", "ParameterExtent"}: - class_names.append("Parameter") - # In these cases, a value parameter is also always accepted. - # It would be quite complex to further differentiate - # between a value and a selection parameter based on - # the type system (one could - # try to check for the type of the Parameter.param attribute - # but then we would need to write some overload signatures for - # api.param). - - type_representations.extend(class_names) - else: - # use RST syntax for generated sphinx docs - type_representations.append(rst_syntax_for_class(self.title)) - - if self.is_empty(): - type_representations.append("Any") - elif self.is_literal(): - tp_str = spell_literal(self.literal) - if for_type_hints: - tp_str = TypeAliasTracer.add_literal(self, tp_str, replace=True) - type_representations.append(tp_str) - elif for_type_hints and self.is_union_literal(): - it = chain.from_iterable(el.literal for el in self.anyOf) - tp_str = TypeAliasTracer.add_literal(self, spell_literal(it), replace=True) - type_representations.append(tp_str) - elif self.is_anyOf(): - it = ( - s.get_python_type_representation( - for_type_hints=for_type_hints, return_as_str=False - ) - for s in self.anyOf - ) - type_representations.extend(maybe_rewrap_literal(chain.from_iterable(it))) - elif isinstance(self.type, list): - options = [] - subschema = SchemaInfo(dict(**self.schema)) - for typ_ in self.type: - subschema.schema["type"] = typ_ - # We always use title if possible for nested objects - options.append( - subschema.get_python_type_representation( - for_type_hints=for_type_hints - ) - ) - type_representations.extend(options) - elif self.is_object() and not for_type_hints: - type_representations.append("dict") - elif self.is_array(): - # A list is invariant in its type parameter. This means that e.g. - # List[str] is not a subtype of List[Union[core.FieldName, str]] - # and hence we would need to explicitly write out the combinations, - # so in this case: - # List[core.FieldName], List[str], List[core.FieldName, str] - # However, this can easily explode to too many combinations. - # Furthermore, we would also need to add additional entries - # for e.g. int wherever a float is accepted which would lead to very - # long code. - # As suggested in the mypy docs, - # https://mypy.readthedocs.io/en/stable/common_issues.html#variance, - # we revert to using Sequence which works as well for lists and also - # includes tuples which are also supported by the SchemaBase.to_dict - # method. However, it is not entirely accurate as some sequences - # such as e.g. a range are not supported by SchemaBase.to_dict but - # this tradeoff seems worth it. - s = self.child(self.items).get_python_type_representation( - for_type_hints=for_type_hints - ) - type_representations.append(f"Sequence[{s}]") - elif self.type in jsonschema_to_python_types: - type_representations.append(jsonschema_to_python_types[self.type]) - else: - msg = "No Python type representation available for this schema" - raise ValueError(msg) - - # Shorter types are usually the more relevant ones, e.g. `str` instead - # of `SchemaBase`. Output order from set is non-deterministic -> If - # types have same length names, order would be non-deterministic as it is - # returned from sort. Hence, we sort as well by type name as a tie-breaker, - # see https://docs.python.org/3.10/howto/sorting.html#sort-stability-and-complex-sorts - # for more infos. - # Using lower as we don't want to prefer uppercase such as "None" over - it = sorted(set(flatten(type_representations)), key=str.lower) # Tertiary sort - it = sorted(it, key=len) # Secondary sort - type_representations = sorted(it, key=TypeAliasTracer.is_cached) # Primary sort - if additional_type_hints: - type_representations.extend(additional_type_hints) - - if return_as_str: - type_representations_str = ", ".join(type_representations) - # If it's not for_type_hints but instead for the docstrings, we don't want - # to include Union as it just clutters the docstrings. - if len(type_representations) > 1 and for_type_hints: - # Use parameterised `TypeAlias` instead of exposing `UndefinedType` - # `Union` is collapsed by `ruff` later - if type_representations_str.endswith(", UndefinedType"): - s = type_representations_str.replace(", UndefinedType", "") - s = f"Optional[Union[{s}]]" - else: - s = f"Union[{type_representations_str}]" - return s - return type_representations_str - else: - return type_representations - - @property - def properties(self) -> SchemaProperties: - return SchemaProperties( - self.schema.get("properties", {}), self.schema, self.rootschema - ) - - @property - def definitions(self) -> SchemaProperties: - return SchemaProperties( - self.schema.get("definitions", {}), self.schema, self.rootschema - ) - - @property - def required(self) -> list: - return self.schema.get("required", []) - - @property - def patternProperties(self) -> dict: - return self.schema.get("patternProperties", {}) - - @property - def additionalProperties(self) -> bool: - return self.schema.get("additionalProperties", True) - - @property - def type(self) -> str | list[Any] | None: - return self.schema.get("type", None) - - @property - def anyOf(self) -> list[SchemaInfo]: - return [self.child(s) for s in self.schema.get("anyOf", [])] - - @property - def oneOf(self) -> list[SchemaInfo]: - return [self.child(s) for s in self.schema.get("oneOf", [])] - - @property - def allOf(self) -> list[SchemaInfo]: - return [self.child(s) for s in self.schema.get("allOf", [])] - - @property - def not_(self) -> SchemaInfo: - return self.child(self.schema.get("not", {})) - - @property - def items(self) -> dict: - return self.schema.get("items", {}) - - @property - def enum(self) -> list[str]: - return self.schema.get("enum", []) - - @property - def const(self) -> str: - return self.schema.get("const", "") - - @property - def literal(self) -> list[str]: - return self.schema.get("enum", [self.const]) - - @property - def refname(self) -> str: - return self.raw_schema.get("$ref", "#/").split("/")[-1] - - @property - def ref(self) -> str | None: - return self.raw_schema.get("$ref", None) - - @property - def description(self) -> str: - return self._get_description(include_sublevels=False) - - @property - def deep_description(self) -> str: - return self._get_description(include_sublevels=True) - - def _get_description(self, include_sublevels: bool = False) -> str: - desc = self.raw_schema.get("description", self.schema.get("description", "")) - if not desc and include_sublevels: - for item in self.anyOf: - sub_desc = item._get_description(include_sublevels=False) - if desc and sub_desc: - raise ValueError( - "There are multiple potential descriptions which could" - + " be used for the currently inspected schema. You'll need to" - + " clarify which one is the correct one.\n" - + str(self.schema) - ) - if sub_desc: - desc = sub_desc - return desc - - def is_reference(self) -> bool: - return "$ref" in self.raw_schema - - def is_enum(self) -> bool: - return "enum" in self.schema - - def is_const(self) -> bool: - return "const" in self.schema - - def is_literal(self) -> bool: - return not ({"enum", "const"}.isdisjoint(self.schema)) - - def is_empty(self) -> bool: - return not (set(self.schema.keys()) - set(EXCLUDE_KEYS)) - - def is_compound(self) -> bool: - return any(key in self.schema for key in ["anyOf", "allOf", "oneOf"]) - - def is_anyOf(self) -> bool: - return "anyOf" in self.schema - - def is_allOf(self) -> bool: - return "allOf" in self.schema - - def is_oneOf(self) -> bool: - return "oneOf" in self.schema - - def is_not(self) -> bool: - return "not" in self.schema - - def is_object(self) -> bool: - if self.type == "object": - return True - elif self.type is not None: - return False - elif ( - self.properties - or self.required - or self.patternProperties - or self.additionalProperties - ): - return True - else: - msg = "Unclear whether schema.is_object() is True" - raise ValueError(msg) - - def is_value(self) -> bool: - return not self.is_object() - - def is_array(self) -> bool: - return self.type == "array" - - def is_union(self) -> bool: - """ - Candidate for ``Union`` type alias. - - Not a real class. - """ - return self.is_anyOf() and self.type is None - - def is_union_literal(self) -> bool: - """ - Candidate for reducing to a single ``Literal`` alias. - - E.g. `BinnedTimeUnit` - """ - return self.is_union() and all(el.is_literal() for el in self.anyOf) - - -class RSTRenderer(_RSTRenderer): - def __init__(self) -> None: - super().__init__() - - def inline_html(self, token: dict[str, Any], state: BlockState) -> str: - html = token["raw"] - return rf"\ :raw-html:`{html}`\ " - - -class RSTParse(mistune.Markdown): - def __init__( - self, - renderer: mistune.BaseRenderer, - block: mistune.BlockParser | None = None, - inline: mistune.InlineParser | None = None, - plugins=None, - ) -> None: - super().__init__(renderer, block, inline, plugins) - - def __call__(self, s: str) -> str: - s = super().__call__(s) - return unescape(s).replace(r"\ ,", ",").replace(r"\ ", " ") - - -rst_parse: RSTParse = RSTParse(RSTRenderer()) - - -def indent_docstring( # noqa: C901 - lines: list[str], indent_level: int, width: int = 100, lstrip=True -) -> str: - """Indent a docstring for use in generated code.""" - final_lines = [] - if len(lines) > 1: - lines += [""] - - for i, line in enumerate(lines): - stripped = line.lstrip() - if stripped: - leading_space = len(line) - len(stripped) - indent = indent_level + leading_space - wrapper = textwrap.TextWrapper( - width=width - indent, - initial_indent=indent * " ", - subsequent_indent=indent * " ", - break_long_words=False, - break_on_hyphens=False, - drop_whitespace=True, - ) - list_wrapper = textwrap.TextWrapper( - width=width - indent, - initial_indent=indent * " " + "* ", - subsequent_indent=indent * " " + " ", - break_long_words=False, - break_on_hyphens=False, - drop_whitespace=True, - ) - for line in stripped.split("\n"): - line_stripped = line.lstrip() - line_stripped = fix_docstring_issues(line_stripped) - if line_stripped == "": - final_lines.append("") - elif line_stripped.startswith("* "): - final_lines.extend(list_wrapper.wrap(line_stripped[2:])) - # Matches lines where an attribute is mentioned followed by the accepted - # types (lines starting with a character sequence that - # does not contain white spaces or '*' followed by ' : '). - # It therefore matches 'condition : anyOf(...' but not '**Notes** : ...' - # These lines should not be wrapped at all but appear on one line - elif re.match(r"[^\s*]+ : ", line_stripped): - final_lines.append(indent * " " + line_stripped) - else: - final_lines.extend(wrapper.wrap(line_stripped)) - - # If this is the last line, put in an indent - elif i + 1 == len(lines): - final_lines.append(indent_level * " ") - # If it's not the last line, this is a blank line that should not indent. - else: - final_lines.append("") - # Remove any trailing whitespaces on the right side - stripped_lines = [] - for i, line in enumerate(final_lines): - if i + 1 == len(final_lines): - stripped_lines.append(line) - else: - stripped_lines.append(line.rstrip()) - # Join it all together - wrapped = "\n".join(stripped_lines) - if lstrip: - wrapped = wrapped.lstrip() - return wrapped - - -def fix_docstring_issues(docstring: str) -> str: - # All lists should start with '*' followed by a whitespace. Fixes the ones - # which either do not have a whitespace or/and start with '-' by first replacing - # "-" with "*" and then adding a whitespace where necessary - docstring = re.sub( - r"^-(?=[ `\"a-z])", - "*", - docstring, - flags=re.MULTILINE, - ) - # Now add a whitespace where an asterisk is followed by one of the characters - # in the square brackets of the regex pattern - docstring = re.sub( - r"^\*(?=[`\"a-z])", - "* ", - docstring, - flags=re.MULTILINE, - ) - - # Links to the vega-lite documentation cannot be relative but instead need to - # contain the full URL. - docstring = docstring.replace( - "types#datetime", "https://vega.github.io/vega-lite/docs/datetime.html" - ) - return docstring - - -def rst_syntax_for_class(class_name: str) -> str: - return f":class:`{class_name}`" - - -def flatten(container: Iterable) -> Iterable: - """ - Flatten arbitrarily flattened list. - - From https://stackoverflow.com/a/10824420 - """ - for i in container: - if isinstance(i, (list, tuple)): - yield from flatten(i) - else: - yield i - - -def spell_literal(it: Iterable[str], /, *, quote: bool = True) -> str: - """ - Combine individual ``str`` type reprs into a single ``Literal``. - - Parameters - ---------- - it - Type representations. - quote - Call ``repr()`` on each element in ``it``. - - .. note:: - Set to ``False`` if performing a second pass. - """ - it_el: Iterable[str] = (f"{s!r}" for s in it) if quote else it - return f"Literal[{', '.join(it_el)}]" - - -def maybe_rewrap_literal(it: Iterable[str], /) -> Iterator[str]: - """ - Where `it` may contain one or more `"enum"`, `"const"`, flatten to a single `Literal[...]`. - - All other type representations are yielded unchanged. - """ - seen: set[str] = set() - for s in it: - if s.startswith("Literal["): - seen.add(unwrap_literal(s)) - else: - yield s - if seen: - yield spell_literal(sorted(seen), quote=False) - - -def unwrap_literal(tp: str, /) -> str: - """`"Literal['value']"` -> `"value"`.""" - return re.sub(r"Literal\[(.+)\]", r"\g<1>", tp) - - -def ruff_format_str(code: str | list[str]) -> str: - if isinstance(code, list): - code = "\n".join(code) - - r = subprocess.run( - # Name of the file does not seem to matter but ruff requires one - ["ruff", "format", "--stdin-filename", "placeholder.py"], - input=code.encode(), - check=True, - capture_output=True, - ) - return r.stdout.decode() - - -def ruff_format_py(fp: Path, /, *extra_args: str) -> None: - """ - Format an existing file. - - Running on `win32` after writing lines will ensure "lf" is used before: - ```bash - ruff format --diff --check . - ``` - """ - cmd = ["ruff", "format", fp] - if extra_args: - cmd.extend(extra_args) - r = subprocess.run(cmd, check=True) - r.check_returncode() - - -def ruff_write_lint_format_str( - fp: Path, code: str | Iterable[str], /, *, encoding: str = "utf-8" -) -> None: - """ - Combined steps of writing, `ruff check`, `ruff format`. - - Notes - ----- - - `fp` is written to first, as the size before formatting will be the smallest - - Better utilizes `ruff` performance, rather than `python` str and io - - `code` is no longer bound to `list` - - Encoding set as default - - `I001/2` are `isort` rules, to sort imports. - """ - commands = ( - ["ruff", "check", fp, "--fix"], - ["ruff", "check", fp, "--fix", "--select", "I001", "--select", "I002"], - ) - if not isinstance(code, str): - code = "\n".join(code) - fp.write_text(code, encoding=encoding) - for cmd in commands: - r = subprocess.run(cmd, check=True) - r.check_returncode() - ruff_format_py(fp) diff --git a/tools/test.py b/tools/test.py deleted file mode 100644 index d041a952..00000000 --- a/tools/test.py +++ /dev/null @@ -1,11 +0,0 @@ -from generated_classes import AggregateExpression, AggregateTransform - -def test_aggregate_expression(): - pass - -def test_aggregate_transform(): - pass -if __name__ == "__main__": - test_aggregate_expression() - test_aggregate_transform() - print("All tests passed!") diff --git a/tools/testingSchema.json b/tools/testingSchema.json deleted file mode 100644 index 822e7c1c..00000000 --- a/tools/testingSchema.json +++ /dev/null @@ -1,259 +0,0 @@ -{ - "$ref": "#/definitions/Spec", - "$schema": "http://json-schema.org/draft-07/schema#", - "definitions": { - "AggregateExpression": { - "additionalProperties": false, - "description": "A custom SQL aggregate expression.", - "properties": { - "agg": { - "description": "A SQL expression string to calculate an aggregate value. Embedded Param references, such as `SUM($param + 1)`, are supported. For expressions without aggregate functions, use *sql* instead.", - "type": "string" - }, - "label": { - "description": "A label for this expression, for example to label a plot axis.", - "type": "string" - } - }, - "required": ["agg"], - "type": "object" - }, - "AggregateTransform": { - "anyOf": [ - { - "$ref": "#/definitions/Argmax" - }, - { - "$ref": "#/definitions/Argmin" - }, - { - "$ref": "#/definitions/Avg" - }, - { - "$ref": "#/definitions/Count" - }, - { - "$ref": "#/definitions/Max" - }, - { - "$ref": "#/definitions/Min" - }, - { - "$ref": "#/definitions/First" - }, - { - "$ref": "#/definitions/Last" - }, - { - "$ref": "#/definitions/Median" - }, - { - "$ref": "#/definitions/Mode" - }, - { - "$ref": "#/definitions/Product" - }, - { - "$ref": "#/definitions/Quantile" - }, - { - "$ref": "#/definitions/Stddev" - }, - { - "$ref": "#/definitions/StddevPop" - }, - { - "$ref": "#/definitions/Sum" - }, - { - "$ref": "#/definitions/Variance" - }, - { - "$ref": "#/definitions/VarPop" - } - ], - "description": "An aggregate transform that combines multiple values." - }, - "Argmax": { - "additionalProperties": false, - "properties": { - "argmax": { - "description": "Find a value of the first column that maximizes the second column.", - "items": { - "description": "A transform argument.", - "type": [ - "string", - "number", - "boolean" - ] - }, - "maxItems": 2, - "minItems": 2, - "type": "array" - }, - "distinct": { - "type": "boolean" - }, - "orderby": { - "anyOf": [ - { - "$ref": "#/definitions/TransformField" - }, - { - "items": { - "$ref": "#/definitions/TransformField" - }, - "type": "array" - } - ] - }, - "partitionby": { - "anyOf": [ - { - "$ref": "#/definitions/TransformField" - }, - { - "items": { - "$ref": "#/definitions/TransformField" - }, - "type": "array" - } - ] - }, - "range": { - "anyOf": [ - { - "items": { - "type": [ - "number", - "null" - ] - }, - "type": "array" - }, - { - "$ref": "#/definitions/ParamRef" - } - ] - }, - "rows": { - "anyOf": [ - { - "items": { - "type": [ - "number", - "null" - ] - }, - "type": "array" - }, - { - "$ref": "#/definitions/ParamRef" - } - ] - } - }, - "required": [ - "argmax" - ], - "type": "object" - }, - "Argmin": { - "additionalProperties": false, - "properties": { - "argmin": { - "description": "Find a value of the first column that minimizes the second column.", - "items": { - "description": "A transform argument.", - "type": [ - "string", - "number", - "boolean" - ] - }, - "maxItems": 2, - "minItems": 2, - "type": "array" - }, - "distinct": { - "type": "boolean" - }, - "orderby": { - "anyOf": [ - { - "$ref": "#/definitions/TransformField" - }, - { - "items": { - "$ref": "#/definitions/TransformField" - }, - "type": "array" - } - ] - }, - "partitionby": { - "anyOf": [ - { - "$ref": "#/definitions/TransformField" - }, - { - "items": { - "$ref": "#/definitions/TransformField" - }, - "type": "array" - } - ] - }, - "range": { - "anyOf": [ - { - "items": { - "type": [ - "number", - "null" - ] - }, - "type": "array" - }, - { - "$ref": "#/definitions/ParamRef" - } - ] - }, - "rows": { - "anyOf": [ - { - "items": { - "type": [ - "number", - "null" - ] - }, - "type": "array" - }, - { - "$ref": "#/definitions/ParamRef" - } - ] - } - }, - "required": [ - "argmin" - ], - "type": "object" - }, "TransformField": { - "anyOf": [ - { - "type": "string" - }, - { - "$ref": "#/definitions/ParamRef" - } - ], - "description": "A field argument to a data transform." - }, - "ParamRef": { - "type": "string" - } - } -}