Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Num Quantiles into dev #982

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
branches:
- 'main'
- 'feature/**'
- 'dev'

jobs:
build:
Expand Down
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ repos:
networkx>=2.5.1,
typing-extensions>=3.10.0.2,
HLL>=2.0.3,
datasketches>=4.1.0,

# requirements-dev.txt
check-manifest>=0.48,
Expand Down Expand Up @@ -109,7 +110,7 @@ repos:
additional_dependencies: ['h5py', 'wheel', 'future', 'numpy', 'pandas',
'python-dateutil', 'pytz', 'pyarrow', 'chardet', 'fastavro',
'python-snappy', 'charset-normalizer', 'psutil', 'scipy', 'requests',
'networkx','typing-extensions', 'HLL']
'networkx','typing-extensions', 'HLL', 'datasketches']
# Pyupgrade - standardize and modernize Python syntax for newer versions of the language
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.0
Expand Down
12 changes: 12 additions & 0 deletions dataprofiler/data_readers/csv_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def __init__(
self._checked_header: bool = "header" in options and self._header != "auto"
self._default_delimiter: str = ","
self._default_quotechar: str = '"'
self._sample_nrows: Optional[int] = options.get("sample_nrows", None)

if data is not None:
self._load_data(data)
Expand Down Expand Up @@ -115,6 +116,11 @@ def header(self) -> Optional[Union[str, int]]:
"""Return header."""
return self._header

@property
def sample_nrows(self) -> Optional[int]:
"""Return sample_nrows."""
return self._sample_nrows

@property
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
Expand Down Expand Up @@ -168,6 +174,10 @@ def _check_and_return_options(options: Optional[Dict]) -> Dict:
raise ValueError(
"'record_samples_per_line' must be an int " "more than 0"
)
if "sample_nrows" in options:
value = options["sample_nrows"]
if not isinstance(value, int) or value < 0:
raise ValueError("'sample_nrows' must be an int more than 0")
return options

@staticmethod
Expand Down Expand Up @@ -549,6 +559,7 @@ def _load_data_from_str(self, data_as_str: str) -> pd.DataFrame:
data_buffered,
self.delimiter,
cast(Optional[int], self.header),
self.sample_nrows,
self.selected_columns,
read_in_string=True,
)
Expand Down Expand Up @@ -595,6 +606,7 @@ def _load_data_from_file(self, input_file_path: str) -> pd.DataFrame:
input_file_path,
self.delimiter,
cast(Optional[int], self.header),
self.sample_nrows,
self.selected_columns,
read_in_string=True,
encoding=self.file_encoding,
Expand Down
115 changes: 112 additions & 3 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Contains functions for data readers."""
import json
import os
import random
import re
import urllib
from collections import OrderedDict
from io import BytesIO, StringIO, TextIOWrapper
from itertools import islice
from math import floor, log, log1p
from typing import (
Any,
Dict,
Expand All @@ -24,7 +28,7 @@
from chardet.universaldetector import UniversalDetector
from typing_extensions import TypeGuard

from .. import dp_logging
from .. import dp_logging, settings
from .._typing import JSONType, Url
from .filepath_or_buffer import FileOrBufferHandler, is_stream_buffer # NOQA

Expand Down Expand Up @@ -268,10 +272,106 @@ def read_json(
return lines


def reservoir(file: TextIOWrapper, sample_nrows: int) -> list:
"""
Implement the mathematical logic of Reservoir sampling.

:param file: wrapper of the opened csv file
:type file: TextIOWrapper
:param sample_nrows: number of rows to sample
:type sample_nrows: int

:raises: ValueError()

:return: sampled values
:rtype: list
"""
# Copyright 2021 Oscar Benjamin
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# https://gist.github.com/oscarbenjamin/4c1b977181f34414a425f68589e895d1

iterator = iter(file)
values = list(islice(iterator, sample_nrows))

irange = range(len(values))
indices = dict(zip(irange, irange))

kinv = 1 / sample_nrows
W = 1.0
rng = random.Random(x=settings._seed)
if "DATAPROFILER_SEED" in os.environ and settings._seed is None:
seed = os.environ.get("DATAPROFILER_SEED")
if seed:
rng = random.Random(int(seed))

while True:
W *= rng.random() ** kinv
# random() < 1.0 but random() ** kinv might not be
# W == 1.0 implies "infinite" skips
if W == 1.0:
break
# skip is geometrically distributed with parameter W
skip = floor(log(rng.random()) / log1p(-W))
try:
newval = next(islice(iterator, skip, skip + 1))
except StopIteration:
break
# Append new, replace old with dummy, and keep track of order
remove_index = rng.randrange(sample_nrows)
values[indices[remove_index]] = str(None)
indices[remove_index] = len(values)
values.append(newval)

values = [values[indices[i]] for i in irange]
return values


def rsample(file_path: TextIOWrapper, sample_nrows: int, args: dict) -> StringIO:
"""
Implement Reservoir Sampling to sample n rows out of a total of M rows.

:param file_path: path of the csv file to be read in
:type file_path: TextIOWrapper
:param sample_nrows: number of rows being sampled
:type sample_nrows: int
:param args: options to read the csv file
:type args: dict
"""
header = args["header"]
result = []

if header is not None:
result = [[next(file_path) for i in range(header + 1)][-1]]
args["header"] = 0

result += reservoir(file_path, sample_nrows)

fo = StringIO("".join([i if (i[-1] == "\n") else i + "\n" for i in result]))
return fo


def read_csv_df(
file_path: Union[str, BytesIO, TextIOWrapper],
delimiter: Optional[str],
header: Optional[int],
sample_nrows: Optional[int] = None,
selected_columns: List[str] = [],
read_in_string: bool = False,
encoding: Optional[str] = "utf-8",
Expand Down Expand Up @@ -314,19 +414,28 @@ def read_csv_df(

# account for py3.6 requirement for pandas, can remove if >= py3.7
is_buf_wrapped = False
is_file_open = False
if isinstance(file_path, BytesIO):
# a BytesIO stream has to be wrapped in order to properly be detached
# in 3.6 this avoids read_csv wrapping the stream and closing too early
file_path = TextIOWrapper(file_path, encoding=encoding)
is_buf_wrapped = True

fo = pd.read_csv(file_path, **args)
elif isinstance(file_path, str):
file_path = open(file_path, encoding=encoding)
is_file_open = True

file_data = file_path
if sample_nrows:
file_data = rsample(file_path, sample_nrows, args)
fo = pd.read_csv(file_data, **args)
data = fo.read()

# if the buffer was wrapped, detach it before returning
if is_buf_wrapped:
file_path = cast(TextIOWrapper, file_path)
file_path.detach()
elif is_file_open:
file_path.close()
fo.close()

return data
Expand Down
2 changes: 1 addition & 1 deletion dataprofiler/data_readers/graph_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def _format_data_networkx(self) -> nx.Graph:
self.input_file_path,
self._delimiter,
cast(Optional[int], self._header),
[],
selected_columns=[],
read_in_string=True,
encoding=self.file_encoding,
)
Expand Down
4 changes: 3 additions & 1 deletion dataprofiler/labelers/base_data_labeler.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,9 @@ def load_from_library(cls, name: str) -> BaseDataLabeler:
:return: DataLabeler class
:rtype: BaseDataLabeler
"""
return cls(os.path.join(default_labeler_dir, name))
labeler = cls(os.path.join(default_labeler_dir, name))
labeler._default_model_loc = name
return labeler

@classmethod
def load_from_disk(cls, dirpath: str, load_options: dict = None) -> BaseDataLabeler:
Expand Down
6 changes: 3 additions & 3 deletions dataprofiler/labelers/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __new__(
class BaseModel(metaclass=abc.ABCMeta):
"""For labeling data."""

_BaseModel__subclasses: dict[str, type[BaseModel]] = {}
__subclasses: dict[str, type[BaseModel]] = {}
__metaclass__ = abc.ABCMeta

# boolean if the label mapping requires the mapping for index 0 reserved
Expand Down Expand Up @@ -90,7 +90,7 @@ def __eq__(self, other: object) -> bool:
def _register_subclass(cls) -> None:
"""Register a subclass for the class factory."""
if not inspect.isabstract(cls):
cls._BaseModel__subclasses[cls.__name__.lower()] = cls
cls.__subclasses[cls.__name__.lower()] = cls

@property
def label_mapping(self) -> dict[str, int]:
Expand Down Expand Up @@ -156,7 +156,7 @@ def get_class(cls, class_name: str) -> type[BaseModel] | None:
from .column_name_model import ColumnNameModel # NOQA
from .regex_model import RegexModel # NOQA

return cls._BaseModel__subclasses.get(class_name.lower(), None)
return cls.__subclasses.get(class_name.lower(), None)

def get_parameters(self, param_list: list[str] | None = None) -> dict:
"""
Expand Down
5 changes: 4 additions & 1 deletion dataprofiler/labelers/data_labelers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def __new__( # type: ignore
trainable: bool = False,
) -> BaseDataLabeler:
"""
Create structured and unstructred data labeler objects.
Create structured and unstructured data labeler objects.

:param dirpath: Path to load data labeler
:type dirpath: str
Expand Down Expand Up @@ -143,6 +143,9 @@ def load_from_library(cls, name: str, trainable: bool = False) -> BaseDataLabele
"""
if trainable:
return TrainableDataLabeler.load_from_library(name)
for _, labeler_class_obj in cls.labeler_classes.items():
if name in labeler_class_obj._default_model_loc:
return labeler_class_obj()
return BaseDataLabeler.load_from_library(name)

@classmethod
Expand Down
Loading